Skip to content

Commit f53439a

Browse files
committed
fix: change MetricsPusher to send tenant metrics
Also use Tasks to ensure that they drop the used memory after aggregating such metrics
1 parent 0916b0d commit f53439a

File tree

2 files changed

+62
-19
lines changed

2 files changed

+62
-19
lines changed

lib/realtime/metrics_pusher.ex

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,20 +97,45 @@ defmodule Realtime.MetricsPusher do
9797
defp schedule_push(delay), do: Process.send_after(self(), :push, delay)
9898

9999
defp push(req_options) do
100+
tasks = [
101+
Task.Supervisor.async_nolink(Realtime.TaskSupervisor, fn ->
102+
push_metrics("global", &Realtime.PromEx.get_global_metrics/0, req_options)
103+
end),
104+
Task.Supervisor.async_nolink(Realtime.TaskSupervisor, fn ->
105+
push_metrics("tenant", &Realtime.TenantPromEx.get_metrics/0, req_options)
106+
end)
107+
]
108+
109+
tasks
110+
|> Task.yield_many(:timer.minutes(1))
111+
|> Enum.each(fn
112+
{task, nil} ->
113+
Task.shutdown(task, :brutal_kill)
114+
Logger.error("MetricsPusher: Task timed out: #{inspect(task)}")
115+
116+
{_task, {:exit, reason}} ->
117+
Logger.error("MetricsPusher: Task exited with reason: #{inspect(reason)}")
118+
119+
{_task, {:ok, _}} ->
120+
:ok
121+
end)
122+
end
123+
124+
defp push_metrics(label, get_metrics_fn, req_options) do
100125
try do
101-
metrics = Realtime.PromEx.get_metrics()
126+
metrics = get_metrics_fn.()
102127

103128
case send_metrics(req_options, metrics) do
104129
:ok ->
105130
:ok
106131

107132
{:error, reason} ->
108-
Logger.error("MetricsPusher: Failed to push metrics to #{req_options[:url]}: #{inspect(reason)}")
133+
Logger.error("MetricsPusher: Failed to push #{label} metrics to #{req_options[:url]}: #{inspect(reason)}")
109134
:ok
110135
end
111136
rescue
112137
error ->
113-
Logger.error("MetricsPusher: Exception during push: #{inspect(error)}")
138+
Logger.error("MetricsPusher: Exception during #{label} push: #{inspect(error)}")
114139
:ok
115140
end
116141
end

test/realtime/metrics_pusher_test.exs

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,13 @@ defmodule Realtime.MetricsPusherTest do
3030
timeout: 5000
3131
]
3232

33+
:telemetry.execute([:realtime, :channel, :input_bytes], %{size: 1024}, %{tenant: "test_tenant"})
34+
3335
parent = self()
3436

35-
Req.Test.expect(MetricsPusher, fn conn ->
36-
body = Req.Test.raw_body(conn)
37+
# Expect 2 requests: one for global metrics, one for tenant metrics
38+
Req.Test.expect(MetricsPusher, 2, fn conn ->
3739
assert conn.method == "POST"
38-
assert :zlib.gunzip(body) =~ "# HELP beam_stats_run_queue_count"
3940
assert conn.scheme == :https
4041
assert conn.host == "example.com"
4142
assert conn.port == 8428
@@ -44,12 +45,26 @@ defmodule Realtime.MetricsPusherTest do
4445
assert Conn.get_req_header(conn, "content-encoding") == ["gzip"]
4546
assert Conn.get_req_header(conn, "content-type") == ["text/plain"]
4647

47-
send(parent, :req_called)
48+
body = Req.Test.raw_body(conn)
49+
decompressed_body = :zlib.gunzip(body)
50+
51+
# Collect decompressed bodies so we can assert that one has global metrics
52+
# and the other has tenant metrics.
53+
send(parent, {:req_called, decompressed_body})
4854
Req.Test.text(conn, "")
4955
end)
5056

5157
{:ok, _pid} = start_and_allow_pusher(opts)
52-
assert_receive :req_called, 100
58+
59+
# Receive both request bodies
60+
assert_receive {:req_called, body1}, 100
61+
assert_receive {:req_called, body2}, 100
62+
63+
# One request must contain a global-only metric, the other a tenant-only metric.
64+
assert (Regex.match?(~r/beam_stats_run_queue_count/, body1) and
65+
Regex.match?(~r/realtime_channel_input_bytes.*tenant=/, body2)) or
66+
(Regex.match?(~r/beam_stats_run_queue_count/, body2) and
67+
Regex.match?(~r/realtime_channel_input_bytes.*tenant=/, body1))
5368
end
5469

5570
test "sends request successfully without auth header" do
@@ -62,9 +77,7 @@ defmodule Realtime.MetricsPusherTest do
6277

6378
parent = self()
6479

65-
Req.Test.expect(MetricsPusher, fn conn ->
66-
body = Req.Test.raw_body(conn)
67-
assert :zlib.gunzip(body) =~ "# HELP beam_stats_run_queue_count"
80+
Req.Test.expect(MetricsPusher, 2, fn conn ->
6881
assert Conn.get_req_header(conn, "authorization") == []
6982

7083
send(parent, :req_called)
@@ -73,6 +86,7 @@ defmodule Realtime.MetricsPusherTest do
7386

7487
{:ok, _pid} = start_and_allow_pusher(opts)
7588
assert_receive :req_called, 100
89+
assert_receive :req_called, 100
7690
end
7791

7892
test "sends request body untouched when compress=false" do
@@ -87,9 +101,7 @@ defmodule Realtime.MetricsPusherTest do
87101

88102
parent = self()
89103

90-
Req.Test.expect(MetricsPusher, fn conn ->
91-
body = Req.Test.raw_body(conn)
92-
assert body =~ "# HELP beam_stats_run_queue_count"
104+
Req.Test.expect(MetricsPusher, 2, fn conn ->
93105
assert Conn.get_req_header(conn, "content-encoding") == []
94106
assert Conn.get_req_header(conn, "content-type") == ["text/plain"]
95107

@@ -99,6 +111,7 @@ defmodule Realtime.MetricsPusherTest do
99111

100112
{:ok, _pid} = start_and_allow_pusher(opts)
101113
assert_receive :req_called, 100
114+
assert_receive :req_called, 100
102115
end
103116

104117
test "when request receives non 2XX response" do
@@ -114,19 +127,21 @@ defmodule Realtime.MetricsPusherTest do
114127

115128
log =
116129
capture_log(fn ->
117-
Req.Test.expect(MetricsPusher, fn conn ->
130+
Req.Test.expect(MetricsPusher, 2, fn conn ->
118131
send(parent, :req_called)
119132
Conn.send_resp(conn, 500, "")
120133
end)
121134

122135
{:ok, pid} = start_and_allow_pusher(opts)
123136
assert_receive :req_called, 100
137+
assert_receive :req_called, 100
124138
assert Process.alive?(pid)
125139
# Wait enough for the log to be captured
126140
Process.sleep(100)
127141
end)
128142

129-
assert log =~ "MetricsPusher: Failed to push metrics to"
143+
assert log =~ "MetricsPusher: Failed to push"
144+
assert log =~ "metrics to"
130145
end
131146

132147
test "when an error is raised" do
@@ -140,25 +155,27 @@ defmodule Realtime.MetricsPusherTest do
140155

141156
log =
142157
capture_log(fn ->
143-
Req.Test.expect(MetricsPusher, fn _conn ->
158+
Req.Test.expect(MetricsPusher, 2, fn _conn ->
144159
send(parent, :req_called)
145160
raise RuntimeError, "unexpected error"
146161
end)
147162

148163
{:ok, pid} = start_and_allow_pusher(opts)
149164
assert_receive :req_called, 100
165+
assert_receive :req_called, 100
150166
assert Process.alive?(pid)
151167
# Wait enough for the log to be captured
152168
Process.sleep(100)
153169
end)
154170

155-
assert log =~ "MetricsPusher: Exception during push: %RuntimeError{message: \"unexpected error\"}"
171+
assert log =~ "MetricsPusher: Exception during"
172+
assert log =~ "push: %RuntimeError{message: \"unexpected error\"}"
156173
end
157174

158175
test "logs unexpected messages and stays alive" do
159176
parent = self()
160177

161-
Req.Test.expect(MetricsPusher, fn conn ->
178+
Req.Test.expect(MetricsPusher, 2, fn conn ->
162179
send(parent, :push_happened)
163180
Req.Test.text(conn, "")
164181
end)
@@ -171,6 +188,7 @@ defmodule Realtime.MetricsPusherTest do
171188
)
172189

173190
assert_receive :push_happened, 500
191+
assert_receive :push_happened, 500
174192

175193
log =
176194
capture_log(fn ->

0 commit comments

Comments
 (0)