Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions lib/realtime/metrics_pusher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,45 @@ defmodule Realtime.MetricsPusher do
defp schedule_push(delay), do: Process.send_after(self(), :push, delay)

defp push(req_options) do
tasks = [
Task.Supervisor.async_nolink(Realtime.TaskSupervisor, fn ->
push_metrics("global", &Realtime.PromEx.get_global_metrics/0, req_options)
end),
Task.Supervisor.async_nolink(Realtime.TaskSupervisor, fn ->
push_metrics("tenant", &Realtime.TenantPromEx.get_metrics/0, req_options)
end)
]

tasks
|> Task.yield_many(:timer.minutes(1))
|> Enum.each(fn
{task, nil} ->
Task.shutdown(task, :brutal_kill)
Logger.error("MetricsPusher: Task timed out: #{inspect(task)}")

{_task, {:exit, reason}} ->
Logger.error("MetricsPusher: Task exited with reason: #{inspect(reason)}")

{_task, {:ok, _}} ->
:ok
end)
end

defp push_metrics(label, get_metrics_fn, req_options) do
try do
metrics = Realtime.PromEx.get_metrics()
metrics = get_metrics_fn.()

case send_metrics(req_options, metrics) do
:ok ->
:ok

{:error, reason} ->
Logger.error("MetricsPusher: Failed to push metrics to #{req_options[:url]}: #{inspect(reason)}")
Logger.error("MetricsPusher: Failed to push #{label} metrics to #{req_options[:url]}: #{inspect(reason)}")
:ok
end
rescue
error ->
Logger.error("MetricsPusher: Exception during push: #{inspect(error)}")
Logger.error("MetricsPusher: Exception during #{label} push: #{inspect(error)}")
:ok
end
end
Expand Down
51 changes: 35 additions & 16 deletions test/realtime/metrics_pusher_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ defmodule Realtime.MetricsPusherTest do
timeout: 5000
]

:telemetry.execute([:realtime, :channel, :input_bytes], %{size: 1024}, %{tenant: "test_tenant"})

parent = self()

Req.Test.expect(MetricsPusher, fn conn ->
body = Req.Test.raw_body(conn)
# Expect 2 requests: one for global metrics, one for tenant metrics
Req.Test.expect(MetricsPusher, 2, fn conn ->
assert conn.method == "POST"
assert :zlib.gunzip(body) =~ "# HELP beam_stats_run_queue_count"
assert conn.scheme == :https
assert conn.host == "example.com"
assert conn.port == 8428
Expand All @@ -44,12 +45,27 @@ defmodule Realtime.MetricsPusherTest do
assert Conn.get_req_header(conn, "content-encoding") == ["gzip"]
assert Conn.get_req_header(conn, "content-type") == ["text/plain"]

send(parent, :req_called)
body = Req.Test.raw_body(conn)
decompressed_body = :zlib.gunzip(body)

# Collect decompressed bodies so we can assert that one has global metrics
# and the other has tenant metrics.
send(parent, {:req_called, decompressed_body})
Req.Test.text(conn, "")
end)

{:ok, _pid} = start_and_allow_pusher(opts)
assert_receive :req_called, 100

# Receive both request bodies
assert_receive {:req_called, body1}, 100
assert_receive {:req_called, body2}, 100

global_metric = ~r/beam_stats_run_queue_count/
tenant_metric = ~r/realtime_channel_input_bytes/

# One request must contain a global-only metric, the other a tenant-only metric.
assert (Regex.match?(global_metric, body1) and Regex.match?(tenant_metric, body2)) or
(Regex.match?(global_metric, body2) and Regex.match?(tenant_metric, body1))
end

test "sends request successfully without auth header" do
Expand All @@ -62,9 +78,7 @@ defmodule Realtime.MetricsPusherTest do

parent = self()

Req.Test.expect(MetricsPusher, fn conn ->
body = Req.Test.raw_body(conn)
assert :zlib.gunzip(body) =~ "# HELP beam_stats_run_queue_count"
Req.Test.expect(MetricsPusher, 2, fn conn ->
assert Conn.get_req_header(conn, "authorization") == []

send(parent, :req_called)
Expand All @@ -73,6 +87,7 @@ defmodule Realtime.MetricsPusherTest do

{:ok, _pid} = start_and_allow_pusher(opts)
assert_receive :req_called, 100
assert_receive :req_called, 100
end

test "sends request body untouched when compress=false" do
Expand All @@ -87,9 +102,7 @@ defmodule Realtime.MetricsPusherTest do

parent = self()

Req.Test.expect(MetricsPusher, fn conn ->
body = Req.Test.raw_body(conn)
assert body =~ "# HELP beam_stats_run_queue_count"
Req.Test.expect(MetricsPusher, 2, fn conn ->
assert Conn.get_req_header(conn, "content-encoding") == []
assert Conn.get_req_header(conn, "content-type") == ["text/plain"]

Expand All @@ -99,6 +112,7 @@ defmodule Realtime.MetricsPusherTest do

{:ok, _pid} = start_and_allow_pusher(opts)
assert_receive :req_called, 100
assert_receive :req_called, 100
end

test "when request receives non 2XX response" do
Expand All @@ -114,19 +128,21 @@ defmodule Realtime.MetricsPusherTest do

log =
capture_log(fn ->
Req.Test.expect(MetricsPusher, fn conn ->
Req.Test.expect(MetricsPusher, 2, fn conn ->
send(parent, :req_called)
Conn.send_resp(conn, 500, "")
end)

{:ok, pid} = start_and_allow_pusher(opts)
assert_receive :req_called, 100
assert_receive :req_called, 100
assert Process.alive?(pid)
# Wait enough for the log to be captured
Process.sleep(100)
end)

assert log =~ "MetricsPusher: Failed to push metrics to"
assert log =~ "MetricsPusher: Failed to push"
assert log =~ "metrics to"
end

test "when an error is raised" do
Expand All @@ -140,25 +156,27 @@ defmodule Realtime.MetricsPusherTest do

log =
capture_log(fn ->
Req.Test.expect(MetricsPusher, fn _conn ->
Req.Test.expect(MetricsPusher, 2, fn _conn ->
send(parent, :req_called)
raise RuntimeError, "unexpected error"
end)

{:ok, pid} = start_and_allow_pusher(opts)
assert_receive :req_called, 100
assert_receive :req_called, 100
assert Process.alive?(pid)
# Wait enough for the log to be captured
Process.sleep(100)
end)

assert log =~ "MetricsPusher: Exception during push: %RuntimeError{message: \"unexpected error\"}"
assert log =~ "MetricsPusher: Exception during"
assert log =~ "push: %RuntimeError{message: \"unexpected error\"}"
end

test "logs unexpected messages and stays alive" do
parent = self()

Req.Test.expect(MetricsPusher, fn conn ->
Req.Test.expect(MetricsPusher, 2, fn conn ->
send(parent, :push_happened)
Req.Test.text(conn, "")
end)
Expand All @@ -171,6 +189,7 @@ defmodule Realtime.MetricsPusherTest do
)

assert_receive :push_happened, 500
assert_receive :push_happened, 500

log =
capture_log(fn ->
Expand Down
Loading