Skip to content

Commit 10bee98

Browse files
authored
1 parent baadaf7 commit 10bee98

File tree

8 files changed

+262
-29
lines changed

8 files changed

+262
-29
lines changed

.changeset/wise-garlics-kneel.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@core/electric-telemetry': patch
3+
---
4+
5+
Restore the inclusion of stack_id in CallHomeReporter's static_info.

packages/electric-telemetry/lib/electric/telemetry/call_home_reporter.ex

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,10 @@ defmodule ElectricTelemetry.CallHomeReporter do
4141

4242
def report_home(telemetry_url, results) do
4343
# Isolate the request in a separate task to avoid blocking and
44-
# to not receive any messages from the HTTP pool internals
45-
Task.start(fn -> Req.post!(telemetry_url, json: results, retry: :transient) end)
44+
# to not receive any messages from the HTTP pool internals.
45+
# The task process must be linked to CallHomeReporter to avoid orphaned processes when the
46+
# CallHomeReporter is shut down deliberately by its supervisor.
47+
Task.async(fn -> Req.post!(telemetry_url, json: results, retry: :transient) end)
4648
:ok
4749
end
4850

@@ -52,6 +54,7 @@ defmodule ElectricTelemetry.CallHomeReporter do
5254

5355
defp cast_time_to_ms({time, :minute}), do: time * 60 * 1000
5456
defp cast_time_to_ms({time, :second}), do: time * 1000
57+
defp cast_time_to_ms({time, :millisecond}), do: time
5558

5659
@impl GenServer
5760
def init(opts) do
@@ -173,6 +176,19 @@ defmodule ElectricTelemetry.CallHomeReporter do
173176
{:noreply, state}
174177
end
175178

179+
# Catch-all clauses to handle the result, EXIT and DOWN messages from the async task started in `report_home()`.
180+
def handle_info({task_mon, %Req.Response{}}, state) when is_reference(task_mon) do
181+
{:noreply, state}
182+
end
183+
184+
def handle_info({:EXIT, _, _}, state) do
185+
{:noreply, state}
186+
end
187+
188+
def handle_info({:DOWN, _, :process, _, _}, state) do
189+
{:noreply, state}
190+
end
191+
176192
defp build_report(state) do
177193
%{
178194
last_reported: state.last_reported,

packages/electric-telemetry/lib/electric/telemetry/opts.ex

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,20 @@ defmodule ElectricTelemetry.Opts do
7373
resource: [type: :map, default: %{}]
7474
],
7575
default: []
76+
],
77+
call_home_reporter_opts: [
78+
type: :keyword_list,
79+
keys: [
80+
first_report_in: [
81+
type: {:tuple, [:pos_integer, {:in, [:millisecond, :second, :minute]}]},
82+
default: {2, :minute}
83+
],
84+
reporting_period: [
85+
type: {:tuple, [:pos_integer, {:in, [:millisecond, :second, :minute]}]},
86+
default: {30, :minute}
87+
]
88+
],
89+
default: []
7690
]
7791
]
7892
end

packages/electric-telemetry/lib/electric/telemetry/reporters/call_home_reporter.ex

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,14 @@ defmodule ElectricTelemetry.Reporters.CallHomeReporter do
22
import Telemetry.Metrics
33

44
def child_spec(telemetry_opts, reporter_opts) do
5-
if call_home_url = get_in(telemetry_opts, [:reporters, :call_home_url]) do
5+
if call_home_url = telemetry_opts.reporters.call_home_url do
66
start_opts =
7-
Keyword.merge(
8-
[
9-
static_info: static_info(telemetry_opts),
10-
call_home_url: call_home_url,
11-
first_report_in: {2, :minute},
12-
reporting_period: {30, :minute}
13-
],
14-
reporter_opts
15-
)
7+
[
8+
static_info: static_info(telemetry_opts),
9+
call_home_url: call_home_url
10+
]
11+
|> Keyword.merge(telemetry_opts.call_home_reporter_opts)
12+
|> Keyword.merge(reporter_opts)
1613

1714
{ElectricTelemetry.CallHomeReporter, start_opts}
1815
end
@@ -103,14 +100,24 @@ defmodule ElectricTelemetry.Reporters.CallHomeReporter do
103100

104101
%{
105102
electric_version: telemetry_opts.version,
106-
environment: %{
107-
os: %{family: os_family, name: os_name},
108-
arch: to_string(arch),
109-
cores: processors,
110-
ram: total_mem,
111-
electric_instance_id: Map.fetch!(telemetry_opts, :instance_id),
112-
electric_installation_id: Map.get(telemetry_opts, :installation_id, "electric_default")
113-
}
103+
environment:
104+
%{
105+
os: %{family: os_family, name: os_name},
106+
arch: to_string(arch),
107+
cores: processors,
108+
ram: total_mem,
109+
electric_instance_id: Map.fetch!(telemetry_opts, :instance_id),
110+
electric_installation_id: Map.get(telemetry_opts, :installation_id, "electric_default")
111+
}
112+
|> maybe_put(telemetry_opts, :stack_id)
114113
}
115114
end
115+
116+
defp maybe_put(map, telemetry_opts, key) do
117+
if val = telemetry_opts[key] do
118+
Map.put(map, key, val)
119+
else
120+
map
121+
end
122+
end
116123
end

packages/electric-telemetry/mix.exs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,11 @@ defmodule ElectricTelemetry.MixProject do
3333

3434
defp dev_and_test_deps do
3535
[
36+
{:bypass, "~> 2.1", only: [:test]},
3637
{:dialyxir, "~> 1.4", only: [:test], runtime: false},
38+
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
3739
{:excoveralls, "~> 0.18", only: [:test], runtime: false},
38-
{:junit_formatter, "~> 3.4", only: [:test], runtime: false},
39-
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false}
40+
{:junit_formatter, "~> 3.4", only: [:test], runtime: false}
4041
]
4142
end
4243

packages/electric-telemetry/mix.lock

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
%{
2+
"bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"},
3+
"cowboy": {:hex, :cowboy, "2.14.2", "4008be1df6ade45e4f2a4e9e2d22b36d0b5aba4e20b0a0d7049e28d124e34847", [:make, :rebar3], [{:cowlib, ">= 2.16.0 and < 3.0.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, ">= 1.8.0 and < 3.0.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "569081da046e7b41b5df36aa359be71a0c8874e5b9cff6f747073fc57baf1ab9"},
4+
"cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"},
5+
"cowlib": {:hex, :cowlib, "2.16.0", "54592074ebbbb92ee4746c8a8846e5605052f29309d3a873468d76cdf932076f", [:make, :rebar3], [], "hexpm", "7f478d80d66b747344f0ea7708c187645cfcc08b11aa424632f78e25bf05db51"},
26
"dialyxir": {:hex, :dialyxir, "1.4.7", "dda948fcee52962e4b6c5b4b16b2d8fa7d50d8645bbae8b8685c3f9ecb7f5f4d", [:mix], [{:erlex, ">= 0.2.8", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b34527202e6eb8cee198efec110996c25c5898f43a4094df157f8d28f27d9efe"},
37
"earmark_parser": {:hex, :earmark_parser, "1.4.44", "f20830dd6b5c77afe2b063777ddbbff09f9759396500cdbe7523efd58d7a339c", [:mix], [], "hexpm", "4778ac752b4701a5599215f7030989c989ffdc4f6df457c5f36938cc2d2a2750"},
48
"erlex": {:hex, :erlex, "0.2.8", "cd8116f20f3c0afe376d1e8d1f0ae2452337729f68be016ea544a72f767d9c12", [:mix], [], "hexpm", "9d66ff9fedf69e49dc3fd12831e12a8a37b76f8651dd21cd45fcf5561a8a7590"},
@@ -17,7 +21,11 @@
1721
"nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"},
1822
"nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"},
1923
"otel_metric_exporter": {:hex, :otel_metric_exporter, "0.4.2", "2cf96ac9879eb06ebde26fa0856e2cd4d5b5f6127eb9ca587532b89a5f981bfc", [:mix], [{:finch, "~> 0.19", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:protobuf, "~> 0.15", [hex: :protobuf, repo: "hexpm", optional: false]}, {:retry, "~> 0.19", [hex: :retry, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "e7d7ca69a3863a2b84badeb9dd275a369754d951c7c5a4cd5f321be868c6d613"},
24+
"plug": {:hex, :plug, "1.19.1", "09bac17ae7a001a68ae393658aa23c7e38782be5c5c00c80be82901262c394c0", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "560a0017a8f6d5d30146916862aaf9300b7280063651dd7e532b8be168511e62"},
25+
"plug_cowboy": {:hex, :plug_cowboy, "2.7.5", "261f21b67aea8162239b2d6d3b4c31efde4daa22a20d80b19c2c0f21b34b270e", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "20884bf58a90ff5a5663420f5d2c368e9e15ed1ad5e911daf0916ea3c57f77ac"},
26+
"plug_crypto": {:hex, :plug_crypto, "2.1.1", "19bda8184399cb24afa10be734f84a16ea0a2bc65054e23a62bb10f06bc89491", [:mix], [], "hexpm", "6470bce6ffe41c8bd497612ffde1a7e4af67f36a15eea5f921af71cf3e11247c"},
2027
"protobuf": {:hex, :protobuf, "0.15.0", "c9fc1e9fc1682b05c601df536d5ff21877b55e2023e0466a3855cc1273b74dcb", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "5d7bb325319db1d668838d2691c31c7b793c34111aec87d5ee467a39dac6e051"},
28+
"ranch": {:hex, :ranch, "1.8.1", "208169e65292ac5d333d6cdbad49388c1ae198136e4697ae2f474697140f201c", [:make, :rebar3], [], "hexpm", "aed58910f4e21deea992a67bf51632b6d60114895eb03bb392bb733064594dd0"},
2129
"req": {:hex, :req, "0.5.16", "99ba6a36b014458e52a8b9a0543bfa752cb0344b2a9d756651db1281d4ba4450", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "974a7a27982b9b791df84e8f6687d21483795882a7840e8309abdbe08bb06f09"},
2230
"retry": {:hex, :retry, "0.19.0", "aeb326d87f62295d950f41e1255fe6f43280a1b390d36e280b7c9b00601ccbc2", [:mix], [], "hexpm", "85ef376aa60007e7bff565c366310966ec1bd38078765a0e7f20ec8a220d02ca"},
2331
"telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"},
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
defmodule ElectricTelemetry.CallHomeReporterTest do
2+
use ExUnit.Case, async: true
3+
4+
@telemetry_opts [
5+
instance_id: "test-instance_id",
6+
stack_id: "test-stack",
7+
version: "test-version",
8+
reporters: [
9+
call_home_url: "...fill this in inside the test case when bypass.port is known..."
10+
],
11+
call_home_reporter_opts: [first_report_in: {1, :millisecond}]
12+
]
13+
14+
setup do
15+
bypass = Bypass.open()
16+
%{bypass: bypass, telemetry_opts: telemetry_opts(bypass)}
17+
end
18+
19+
test "reports all expected info when started under ApplicationTelemetry", ctx do
20+
add_bypass_expectation(ctx, fn report ->
21+
# We assert the shape of the entire report here but values aren't valid since not enough
22+
# time has passed during the test execution to gather the data.
23+
assert %{
24+
"data" => %{
25+
"electric_version" => "test-version",
26+
"environment" => %{
27+
"arch" => _,
28+
"cores" => _,
29+
"electric_installation_id" => "electric_default",
30+
"electric_instance_id" => "test-instance_id",
31+
"os" => _,
32+
"ram" => _
33+
},
34+
"resources" => %{
35+
"run_queue_cpu" => %{"max" => _, "mean" => _, "min" => _},
36+
"run_queue_io" => %{"max" => _, "mean" => _, "min" => _},
37+
"run_queue_total" => %{"max" => _, "mean" => _, "min" => _},
38+
"uptime" => _,
39+
"used_memory" => %{"max" => _, "mean" => _, "min" => _}
40+
},
41+
"system" => %{
42+
"load_avg1" => _,
43+
"load_avg15" => _,
44+
"load_avg5" => _,
45+
"memory_free" => _,
46+
"memory_free_percent" => _,
47+
"memory_used" => _,
48+
"memory_used_percent" => _,
49+
"swap_free" => _,
50+
"swap_free_percent" => _,
51+
"swap_used" => _,
52+
"swap_used_percent" => _
53+
}
54+
},
55+
"last_reported" => _,
56+
"report_version" => _,
57+
"timestamp" => _
58+
} = report
59+
end)
60+
61+
start_supervised!({ElectricTelemetry.ApplicationTelemetry, ctx.telemetry_opts})
62+
63+
assert_receive :bypass_done, 500
64+
end
65+
66+
test "reports all expected info when started under StackTelemetry", ctx do
67+
add_bypass_expectation(ctx, fn report ->
68+
# We assert the shape of the entire report here but values aren't valid since not enough
69+
# time has passed during the test execution to gather the data.
70+
assert %{
71+
"data" => %{
72+
"electric_version" => "test-version",
73+
"environment" => %{
74+
"arch" => _,
75+
"cores" => _,
76+
"electric_installation_id" => "electric_default",
77+
"electric_instance_id" => "test-instance_id",
78+
"os" => _,
79+
"pg_version" => _,
80+
"ram" => _,
81+
"stack_id" => "test-stack"
82+
},
83+
"usage" => %{
84+
"active_shapes" => _,
85+
"inbound_bytes" => _,
86+
"inbound_operations" => _,
87+
"inbound_transactions" => _,
88+
"live_requests" => _,
89+
"served_bytes" => _,
90+
"stored_bytes" => _,
91+
"stored_operations" => _,
92+
"stored_transactions" => _,
93+
"sync_requests" => _,
94+
"total_shapes" => _,
95+
"total_used_storage_kb" => _,
96+
"unique_clients" => _,
97+
"wal_size" => %{"max" => _, "mean" => _, "min" => _}
98+
}
99+
},
100+
"last_reported" => _,
101+
"report_version" => _,
102+
"timestamp" => _
103+
} = report
104+
end)
105+
106+
start_supervised!({ElectricTelemetry.StackTelemetry, ctx.telemetry_opts})
107+
108+
assert_receive :bypass_done, 500
109+
end
110+
111+
defp add_bypass_expectation(%{bypass: bypass, telemetry_opts: telemetry_opts}, test_specific_fn) do
112+
test_pid = self()
113+
114+
Bypass.expect(bypass, "POST", "/checkpoint", fn conn ->
115+
assert {"content-type", "application/json"} in conn.req_headers
116+
assert {:ok, body, conn} = Plug.Conn.read_body(conn)
117+
118+
report = :json.decode(body)
119+
120+
assert_call_home_report_common_fields(report, telemetry_opts)
121+
test_specific_fn.(report)
122+
123+
send(test_pid, :bypass_done)
124+
125+
Plug.Conn.send_resp(conn, 200, "")
126+
end)
127+
end
128+
129+
defp telemetry_opts(bypass) do
130+
# CallHomeReporter can work with both a string and a URI struct
131+
url = "http://localhost:#{bypass.port}/checkpoint" |> maybe_parse_url()
132+
put_in(@telemetry_opts, [:reporters, :call_home_url], url)
133+
end
134+
135+
defp maybe_parse_url(url) do
136+
if :rand.uniform(2) == 1 do
137+
URI.parse(url)
138+
else
139+
url
140+
end
141+
end
142+
143+
def assert_call_home_report_common_fields(report, telemetry_opts) do
144+
# Extracting only those fields from the report that will be validated in this function.
145+
%{
146+
"data" => %{
147+
"environment" => %{
148+
"arch" => arch,
149+
"cores" => cores,
150+
"os" => %{"family" => "unix", "name" => os_name},
151+
"ram" => ram
152+
}
153+
},
154+
"last_reported" => last_reported,
155+
"report_version" => 2,
156+
"timestamp" => timestamp
157+
} = report
158+
159+
# Execute CallHomeReporter.static_info() here to make the assertions resilient to platform
160+
# variations between different envinronments in which this test will run.
161+
static_info =
162+
telemetry_opts
163+
|> ElectricTelemetry.validate_options()
164+
|> then(fn {:ok, opts} ->
165+
ElectricTelemetry.Reporters.CallHomeReporter.static_info(opts)
166+
end)
167+
168+
assert arch == static_info.environment.arch
169+
# If you get an assertion failure here when running the test suite on your dev
170+
# machine, please add your arch to the list.
171+
assert arch in ["x86_64-pc-linux-gnu"]
172+
173+
assert cores == static_info.environment.cores
174+
assert is_integer(cores)
175+
assert cores >= 4
176+
177+
assert os_name == to_string(static_info.environment.os.name)
178+
# If you get an assertion failure here, please add your OS name to the list.
179+
assert os_name in ["linux"]
180+
181+
assert ram == static_info.environment.ram
182+
assert is_integer(ram)
183+
assert ram >= 4 * 1024 * 1024 * 1024
184+
185+
assert {:ok, %DateTime{}, 0} = DateTime.from_iso8601(last_reported)
186+
assert {:ok, %DateTime{}, 0} = DateTime.from_iso8601(timestamp)
187+
end
188+
end
Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,2 @@
1-
# The process registry is implicitly used by processes in the dev, prod and test environments alike.
2-
#
3-
# Explicitly start the process registry here since the OTP application does not start a
4-
# supervision tree in the test environment.
5-
# Registry.start_link(name: Electric.Application.process_registry(), keys: :unique)
6-
71
ExUnit.configure(formatters: [JUnitFormatter, ExUnit.CLIFormatter])
8-
ExUnit.start(assert_receive_timeout: 400, exclude: [:slow], capture_log: true)
2+
ExUnit.start(capture_log: true)

0 commit comments

Comments
 (0)