Skip to content

Commit 9904d56

Browse files
authored
Memory optimizations (#39)
This PR tries to reduce the memory cost of Neurow subscriptions. **TL;DR** By hibernating SSE monitors and manually running the garbage collector in SSE processes, the PR decreases the memory cost of SSE subscriptions from about 63ko per subscription to 30ko per subscription. In a server that handles 80k SSE subscription, it should save about **2.5Go of RAM**. This optimization should however slightly increase the CPU usage because garbage collection run more often. Note: This PR not actually change the max memory required by a Neurow server. The server would eventually run garbage collection in order to free memory to re-use it just after. It just release the memory without having to wait for a gc when the server needs more RAM. This makes its behavior more predictable, and allows to set auto-scaling rules based on the actual RAM usage. ## Analysis The function `Neurow.Observability.System.process_groups` analyses the process memory usage: It groups processes by name and count their memory cost, number of process and number of messages. It can be used in the IEX console by calling `Neurow.Observability.System.process_groups`, or by calling `GET /process_stats` on the internal API. Locally, by starting 2k concurrent subscriptions that subscribe to 10 shared topics, and then by continuously sending messages on these 10 topics, here are some interesting outliers: | name / initial function | current function | process count | memory | cost per subscription (ko) | cost for 80k subscriptions (mo) | -- | -- | -- | -- | -- | -- | cowboy_stream_h:request_process/3 | Elixir.Neurow.PublicApi.Endpoint:loop/7 | 2000 | 80791512 | 39.45 | 3,081.95 cowboy_clear:connection_process/4 | cowboy_http:loop/1 | 2000 | 43334224 | 21.16 | 1,653.07 Elixir.Neurow.PublicApi.SSEMonitor:init/1 | gen_server:loop/7 | 2000 | 5561152 | 2.72 | 212.14   |   |   |   |   |     |   |   | **Total** | **63.32** | **4,947.16** | No issue were found in the number of messages. ## Optimizations ### Hibernate SSE Monitors Erlang allows to manually put processes in [hibernation](https://www.erlang.org/doc/apps/erts/erlang.html#hibernate/3): > Puts the calling process into a wait state where its memory allocation has been reduced as much as possible. This is useful if the process does not expect to receive any messages soon. SSEMonitor is a genserver, putting it in hibernation is simple: It just requires to return `{:ok, state, :hibernate}` from the init function. ### Manual garbage collection in SSE subscriptions Because SSE requests handlers are not genservers, it is much more complex to hibernate them. `:erlang.hibernate` exists and use it with `Process.send_after` could allow to implement a ticker, but `:erlang.hibernate` erases the process call stack. That means all instructions that should run after that `Neurow.PublicApi.Endpoint#subscribe` returns will actually not run if the SSE process hibernates. So instead of hibernating we just first force a garbage collection during each SSE passive loop. Compare to an actual hibernation the call stack is not deleted, and all live data are not moved to a continuous heap. But it still provides significant improvements. ### Results | name / initial function | Current function | process count | memory | cost per subscription (Ko) | cost for 80k subscriptions (Mo) | -- | -- | -- | -- | -- | -- | cowboy_stream_h:request_process/3 | Elixir.Neurow.PublicApi.Endpoint:loop/7 | 2000 | 14767616 | 7.21 | 563.34 cowboy_clear:connection_process/4 | cowboy_http:loop/1 | 2000 | 43316544 | 21.15 | 1,652.40 Elixir.Neurow.PublicApi.SSEMonitor:init/1 | gen_server:loop/7 | 2000 | 2656000 | 1.30 | 101.32   |   |   |   |   |     |   |   | Total | 29.66 | 2,317.05 | The subscription cost is reduced from 63ko to 30ko. In a server that handles 80k SSE subscriptions, it should save about **2.5Go of RAM** A debounce is added to the manual garbage collection to have max 1 manual GC per SSE process every 60s. Actually the SSE passive loop is triggered much less often, the amount of manual GC should be much lower. A manual GC is always triggered during the first run of the SSE passive loop.
1 parent 7fad989 commit 9904d56

14 files changed

+338
-200
lines changed

neurow/lib/metric_plug_exporter.ex

Lines changed: 0 additions & 3 deletions
This file was deleted.

neurow/lib/neurow/application.ex

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,7 @@ defmodule Neurow.Application do
9292
[]
9393
end
9494

95-
MetricsPlugExporter.setup()
96-
Neurow.Stats.setup()
95+
Neurow.Observability.setup()
9796
JOSE.json_module(:jiffy)
9897

9998
opts = [strategy: :one_for_one, name: Neurow.Supervisor]

neurow/lib/neurow/broker/receiver_shard_manager.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ defmodule Neurow.Broker.ReceiverShardManager do
4242
end
4343

4444
def rotate do
45-
Neurow.Stats.MessageBroker.inc_history_rotate()
45+
Neurow.Observability.MessageBrokerStats.inc_history_rotate()
4646

4747
Enum.each(receiver_shards(), fn {_shard, pid} ->
4848
send(pid, {:rotate})

neurow/lib/neurow/internal_api/endpoint.ex

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ defmodule Neurow.InternalApi.Endpoint do
66
alias Neurow.Broker.Message
77

88
use Plug.Router
9-
plug(MetricsPlugExporter)
9+
plug(Neurow.Observability.MetricsPlugExporter)
1010

1111
plug(Neurow.JwtAuthPlug,
1212
credential_headers: ["x-interservice-authorization", "authorization"],
@@ -16,8 +16,15 @@ defmodule Neurow.InternalApi.Endpoint do
1616
&Neurow.Configuration.internal_api_verbose_authentication_errors/0,
1717
max_lifetime: &Neurow.Configuration.internal_api_jwt_max_lifetime/0,
1818
send_forbidden: &Neurow.InternalApi.Endpoint.send_forbidden/3,
19-
inc_error_callback: &Neurow.Stats.Security.inc_jwt_errors_internal/0,
20-
exclude_path_prefixes: ["/ping", "/nodes", "/cluster_size_above", "/history"]
19+
inc_error_callback: &Neurow.Observability.SecurityStats.inc_jwt_errors_internal/0,
20+
exclude_path_prefixes: [
21+
"/ping",
22+
"/nodes",
23+
"/cluster_size_above",
24+
"/history",
25+
"/process_stats",
26+
"/favicon.ico"
27+
]
2128
)
2229

2330
plug(:match)
@@ -55,6 +62,18 @@ defmodule Neurow.InternalApi.Endpoint do
5562
)
5663
end
5764

65+
get "/process_stats" do
66+
conn
67+
|> put_resp_header("content-type", "application/json")
68+
|> send_resp(
69+
200,
70+
Jason.encode!(
71+
Neurow.Observability.SystemStats.process_groups()
72+
|> Enum.map(&Map.from_struct/1)
73+
)
74+
)
75+
end
76+
5877
get "/cluster_size_above/:size" do
5978
size = String.to_integer(size)
6079
cluster_size = length(Node.list()) + 1
@@ -97,7 +116,7 @@ defmodule Neurow.InternalApi.Endpoint do
97116
end)
98117
end)
99118

100-
Neurow.Stats.MessageBroker.inc_message_published(issuer)
119+
Neurow.Observability.MessageBrokerStats.inc_message_published(issuer)
101120

102121
conn
103122
|> put_resp_header("content-type", "application/json")
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
defmodule Neurow.Observability.HttpInterfacesStats do
2+
use Prometheus.Metric
3+
4+
def setup() do
5+
Summary.declare(
6+
name: :http_request_duration_ms,
7+
labels: [:interface],
8+
help: "HTTP request duration"
9+
)
10+
11+
Counter.declare(
12+
name: :http_request_count,
13+
labels: [:interface, :status],
14+
help: "HTTP request count"
15+
)
16+
17+
Summary.reset(name: :http_request_duration_ms, labels: [:public_api])
18+
Summary.reset(name: :http_request_duration_ms, labels: [:internal_api])
19+
20+
# Please read https://github.com/beam-telemetry/cowboy_telemetry
21+
:telemetry.attach_many(
22+
"cowboy_telemetry_handler",
23+
[
24+
[:cowboy, :request, :stop]
25+
],
26+
&Neurow.Observability.HttpInterfacesStats.handle_event/4,
27+
nil
28+
)
29+
end
30+
31+
def handle_event([:cowboy, :request, :stop], measurements, metadata, _config) do
32+
endpoint =
33+
case metadata[:req][:ref] do
34+
Neurow.PublicApi.Endpoint.HTTP -> :public_api
35+
Neurow.InternalApi.Endpoint.HTTP -> :internal_api
36+
end
37+
38+
duration_ms = System.convert_time_unit(measurements[:duration], :native, :millisecond)
39+
resp_status = metadata[:resp_status]
40+
41+
Counter.inc(name: :http_request_count, labels: [endpoint, resp_status])
42+
Summary.observe([name: :http_request_duration_ms, labels: [endpoint]], duration_ms)
43+
end
44+
end
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
defmodule Neurow.Observability.MessageBrokerStats do
2+
use Prometheus.Metric
3+
4+
def setup() do
5+
Gauge.declare(
6+
name: :concurrent_subscription,
7+
labels: [:issuer],
8+
help: "Amount of concurrent topic subscriptions"
9+
)
10+
11+
Counter.declare(
12+
name: :subscription_lifecycle,
13+
labels: [:kind, :issuer],
14+
help: "Count subscriptions and unsubscriptions"
15+
)
16+
17+
Summary.declare(
18+
name: :subscription_duration_ms,
19+
labels: [:issuer],
20+
help: "Duration of topic subscriptions"
21+
)
22+
23+
Counter.declare(
24+
name: :message,
25+
labels: [:kind, :issuer],
26+
help: "Messages sent through topic subscriptions"
27+
)
28+
29+
Counter.declare(
30+
name: :history_rotate,
31+
help: "History rotate counter"
32+
)
33+
34+
Gauge.declare(
35+
name: :topic_count,
36+
help: "Number of topics in the message history"
37+
)
38+
39+
Counter.reset(name: :history_rotate)
40+
41+
Gauge.set([name: :topic_count], 0)
42+
43+
Enum.each(Neurow.Configuration.issuers(), fn issuer ->
44+
Gauge.set([name: :concurrent_subscription, labels: [issuer]], 0)
45+
Counter.reset(name: :subscription_lifecycle, labels: [:created, issuer])
46+
Counter.reset(name: :subscription_lifecycle, labels: [:released, issuer])
47+
Counter.reset(name: :message, labels: [:published, issuer])
48+
Counter.reset(name: :message, labels: [:sent, issuer])
49+
Summary.reset(name: :subscription_duration_ms, labels: [issuer])
50+
end)
51+
52+
Periodic.start_link(
53+
run: fn ->
54+
Gauge.set([name: :topic_count], Neurow.Broker.ReceiverShardManager.topic_count())
55+
end,
56+
every: :timer.seconds(10)
57+
)
58+
end
59+
60+
def inc_subscriptions(issuer) do
61+
Counter.inc(name: :subscription_lifecycle, labels: [:created, issuer])
62+
Gauge.inc(name: :concurrent_subscription, labels: [issuer])
63+
end
64+
65+
def dec_subscriptions(issuer, duration_ms) do
66+
Counter.inc(name: :subscription_lifecycle, labels: [:released, issuer])
67+
Gauge.dec(name: :concurrent_subscription, labels: [issuer])
68+
Summary.observe([name: :subscription_duration_ms, labels: [issuer]], duration_ms)
69+
end
70+
71+
def inc_message_published(issuer) do
72+
Counter.inc(name: :message, labels: [:published, issuer])
73+
end
74+
75+
def inc_message_sent(issuer) do
76+
Counter.inc(name: :message, labels: [:sent, issuer])
77+
end
78+
79+
def inc_history_rotate() do
80+
Counter.inc(name: :history_rotate)
81+
end
82+
end
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
defmodule Neurow.Observability.MetricsPlugExporter do
2+
use Prometheus.PlugExporter
3+
end
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
defmodule Neurow.Observability do
2+
use Prometheus.Metric
3+
4+
def setup() do
5+
Neurow.Observability.MessageBrokerStats.setup()
6+
Neurow.Observability.HttpInterfacesStats.setup()
7+
Neurow.Observability.MetricsPlugExporter.setup()
8+
Neurow.Observability.SecurityStats.setup()
9+
Neurow.Observability.SystemStats.setup()
10+
end
11+
end
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
defmodule Neurow.Observability.SecurityStats do
2+
use Prometheus.Metric
3+
4+
def setup() do
5+
Counter.declare(
6+
name: :jwt_errors,
7+
labels: [:interface],
8+
help: "JWT Errors"
9+
)
10+
end
11+
12+
def inc_jwt_errors_public() do
13+
Counter.inc(name: :jwt_errors, labels: [:public])
14+
end
15+
16+
def inc_jwt_errors_internal() do
17+
Counter.inc(name: :jwt_errors, labels: [:internal])
18+
end
19+
end
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
defmodule Neurow.Observability.SystemStats do
2+
use Prometheus.Metric
3+
4+
def setup() do
5+
Gauge.declare(
6+
name: :memory_usage,
7+
help: "Memory usage"
8+
)
9+
10+
Boolean.declare(
11+
name: :stopping,
12+
help: "The node is currently stopping"
13+
)
14+
15+
Gauge.set([name: :memory_usage], 0)
16+
Boolean.set([name: :stopping], false)
17+
18+
Periodic.start_link(
19+
run: fn -> Gauge.set([name: :memory_usage], :recon_alloc.memory(:usage)) end,
20+
every: :timer.seconds(10)
21+
)
22+
end
23+
24+
def report_shutdown() do
25+
Boolean.set([name: :stopping], true)
26+
end
27+
28+
defmodule ProcessesStats do
29+
defstruct [
30+
:name_or_initial_func,
31+
:current_func,
32+
process_count: 0,
33+
memory: 0,
34+
message_queue_len: 0
35+
]
36+
end
37+
38+
# Group process by name or initial function, and current function.
39+
# Then sort by memory usage and return the top consuming processe groups
40+
def process_groups(result_count \\ 50) do
41+
Process.list()
42+
|> Enum.reduce(%{}, fn pid, acc ->
43+
{name_or_initial_func, current_func} = grouping_attributes(pid)
44+
45+
Map.update(
46+
acc,
47+
{name_or_initial_func, current_func},
48+
%ProcessesStats{
49+
name_or_initial_func: name_or_initial_func,
50+
current_func: current_func
51+
},
52+
fn current_stats ->
53+
process_info = Process.info(pid, [:memory, :message_queue_len])
54+
55+
%{
56+
current_stats
57+
| process_count: current_stats.process_count + 1,
58+
memory: current_stats.memory + (process_info[:memory] || 0),
59+
message_queue_len:
60+
current_stats.message_queue_len + (process_info[:message_queue_len] || 0)
61+
}
62+
end
63+
)
64+
end)
65+
|> Map.values()
66+
|> Enum.sort(&(&1.memory > &2.memory))
67+
|> Enum.take(result_count)
68+
end
69+
70+
defp mfa_to_string({module, function, arity}) do
71+
"#{module}:#{function}/#{arity}"
72+
end
73+
74+
defp grouping_attributes(pid) do
75+
name_or_initial_func =
76+
case Process.info(pid, [:registered_name, :dictionary, :initial_call]) do
77+
[{:registered_name, name} | _rest] when is_atom(name) ->
78+
name
79+
80+
[{:registered_name, [first_name | _other_names]}, _rest] ->
81+
first_name
82+
83+
[
84+
{:registered_name, []},
85+
{:dictionary, [{:"$initial_call", initial_call} | _rest_dictionary]} | _rest
86+
] ->
87+
mfa_to_string(initial_call)
88+
89+
[
90+
{:registered_name, []},
91+
{:dictionary, _rest_dictionary},
92+
{:initial_call, initial_call}
93+
] ->
94+
mfa_to_string(initial_call)
95+
96+
_ ->
97+
:undefined
98+
end
99+
100+
case Process.info(pid, :current_function) do
101+
{:current_function, current_function} ->
102+
{name_or_initial_func, mfa_to_string(current_function)}
103+
104+
nil ->
105+
{name_or_initial_func, :undefined}
106+
end
107+
end
108+
end

0 commit comments

Comments
 (0)