Skip to content

Commit cc1143e

Browse files
authored
fix: change metrics cleaner to also clean up Connect shutdown (#1674)
Once Connect for the tenant shuts down clean up after threshold has passed
1 parent df022f9 commit cc1143e

File tree

5 files changed

+234
-17
lines changed

5 files changed

+234
-17
lines changed

lib/realtime/metrics_cleaner.ex

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,26 @@ defmodule Realtime.MetricsCleaner do
66

77
defstruct [:check_ref, :interval]
88

9-
def handle_event([:beacon, :users, :group, :vacant], _measurements, %{group: tenant_id}, cleaner_table) do
10-
:ets.insert(cleaner_table, {tenant_id, DateTime.to_unix(DateTime.utc_now(), :second)})
9+
def handle_beacon_event([:beacon, :users, :group, :vacant], _, %{group: tenant_id}, vacant_websockets) do
10+
:ets.insert(vacant_websockets, {tenant_id, DateTime.to_unix(DateTime.utc_now(), :second)})
1111
end
1212

13-
def handle_event([:beacon, :users, :group, :occupied], _measurements, %{group: tenant_id}, cleaner_table) do
14-
:ets.delete(cleaner_table, tenant_id)
13+
def handle_beacon_event([:beacon, :users, :group, :occupied], _, %{group: tenant_id}, vacant_websockets) do
14+
:ets.delete(vacant_websockets, tenant_id)
15+
end
16+
17+
def handle_syn_event([:syn, Realtime.Tenants.Connect, :unregistered], _, %{name: tenant_id}, disconnected_tenants) do
18+
:ets.insert(disconnected_tenants, {tenant_id, DateTime.to_unix(DateTime.utc_now(), :second)})
19+
end
20+
21+
def handle_syn_event([:syn, Realtime.Tenants.Connect, :registered], _, %{name: tenant_id}, disconnected_tenants) do
22+
:ets.delete(disconnected_tenants, tenant_id)
1523
end
1624

1725
def start_link(opts), do: GenServer.start_link(__MODULE__, opts)
1826

19-
# 5 minutes
20-
@default_vacant_metric_threshold_in_seconds 300
27+
# 10 minutes
28+
@default_vacant_metric_threshold_in_seconds 600
2129

2230
@impl true
2331
def init(opts) do
@@ -30,32 +38,54 @@ defmodule Realtime.MetricsCleaner do
3038

3139
Logger.info("Starting MetricsCleaner")
3240

33-
tid = :ets.new(:metrics_cleaner, [:set, :public, read_concurrency: false, write_concurrency: :auto])
41+
vacant_websockets = :ets.new(:vacant_websockets, [:set, :public, read_concurrency: false, write_concurrency: :auto])
42+
43+
disconnected_tenants =
44+
:ets.new(:disconnected_tenants, [:set, :public, read_concurrency: false, write_concurrency: :auto])
3445

3546
:ok =
3647
:telemetry.attach_many(
37-
self(),
48+
[self(), :vacant_websockets],
3849
[[:beacon, :users, :group, :occupied], [:beacon, :users, :group, :vacant]],
39-
&__MODULE__.handle_event/4,
40-
tid
50+
&__MODULE__.handle_beacon_event/4,
51+
vacant_websockets
52+
)
53+
54+
:ok =
55+
:telemetry.attach_many(
56+
[self(), :disconnected_tenants],
57+
[[:syn, Realtime.Tenants.Connect, :registered], [:syn, Realtime.Tenants.Connect, :unregistered]],
58+
&__MODULE__.handle_syn_event/4,
59+
disconnected_tenants
4160
)
4261

4362
{:ok,
4463
%{
4564
check_ref: check(interval),
4665
interval: interval,
4766
vacant_metric_threshold_in_seconds: vacant_metric_threshold_in_seconds,
48-
tid: tid
67+
vacant_websockets: vacant_websockets,
68+
disconnected_tenants: disconnected_tenants
4969
}}
5070
end
5171

72+
@impl true
73+
def terminate(_reason, _state) do
74+
:telemetry.detach([self(), :vacant_websockets])
75+
:telemetry.detach([self(), :disconnected_tenants])
76+
:ok
77+
end
78+
5279
@impl true
5380
def handle_info(:check, %{interval: interval} = state) do
5481
Process.cancel_timer(state.check_ref)
5582

5683
{exec_time, _} =
5784
:timer.tc(
58-
fn -> loop_and_cleanup_metrics_table(state.tid, state.vacant_metric_threshold_in_seconds) end,
85+
fn ->
86+
loop_and_cleanup_metrics_table(state.vacant_websockets, state.vacant_metric_threshold_in_seconds)
87+
loop_and_cleanup_metrics_table(state.disconnected_tenants, state.vacant_metric_threshold_in_seconds)
88+
end,
5989
:millisecond
6090
)
6191

lib/realtime/syn_handler.ex

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ defmodule Realtime.SynHandler do
2929
end
3030
end
3131

32+
@impl true
33+
def on_process_registered(scope, name, _pid, _meta, _reason) do
34+
:telemetry.execute([:syn, scope, :registered], %{}, %{name: name})
35+
end
36+
3237
@doc """
3338
When processes registered with :syn are unregistered, either manually or by stopping, this
3439
callback is invoked.
@@ -40,6 +45,8 @@ defmodule Realtime.SynHandler do
4045
"""
4146
@impl true
4247
def on_process_unregistered(scope, name, pid, _meta, reason) do
48+
:telemetry.execute([:syn, scope, :unregistered], %{}, %{name: name})
49+
4350
case Atom.to_string(scope) do
4451
@postgres_cdc_scope_prefix <> _ = scope ->
4552
Endpoint.local_broadcast(PostgresCdc.syn_topic(name), scope <> "_down", %{pid: pid, reason: reason})

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.71.3",
7+
version: "2.71.4",
88
elixir: "~> 1.18",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/realtime/metrics_cleaner_test.exs

Lines changed: 127 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ defmodule Realtime.MetricsCleanerTest do
22
use Realtime.DataCase, async: true
33

44
alias Realtime.MetricsCleaner
5+
alias Realtime.Tenants.Connect
56

6-
describe "metrics cleanup" do
7+
describe "metrics cleanup - vacant websockets" do
78
test "cleans up metrics for users that have been disconnected" do
89
:telemetry.execute(
910
[:realtime, :connections],
@@ -48,15 +49,15 @@ defmodule Realtime.MetricsCleanerTest do
4849
# Wait for clean up to run
4950
Process.sleep(200)
5051

51-
# Nothing changes
52+
# Nothing changes yet (threshold not reached)
5253
metrics = Realtime.PromEx.get_metrics() |> IO.iodata_to_binary()
5354

5455
assert String.contains?(metrics, "tenant=\"occupied-tenant\"")
5556
assert String.contains?(metrics, "tenant=\"vacant-tenant1\"")
5657
assert String.contains?(metrics, "tenant=\"vacant-tenant2\"")
5758

58-
# Wait for clean up to run again
59-
Process.sleep(2100)
59+
# Wait for threshold to pass and cleanup to run
60+
Process.sleep(2200)
6061

6162
# vacant tenant metrics are now gone
6263
metrics = Realtime.PromEx.get_metrics() |> IO.iodata_to_binary()
@@ -65,5 +66,127 @@ defmodule Realtime.MetricsCleanerTest do
6566
refute String.contains?(metrics, "tenant=\"vacant-tenant1\"")
6667
refute String.contains?(metrics, "tenant=\"vacant-tenant2\"")
6768
end
69+
70+
test "does not clean up metrics if websockets reconnect before threshold" do
71+
:telemetry.execute(
72+
[:realtime, :connections],
73+
%{connected: 1, connected_cluster: 10, limit: 100},
74+
%{tenant: "reconnect-tenant"}
75+
)
76+
77+
pid = spawn_link(fn -> Process.sleep(:infinity) end)
78+
79+
Beacon.join(:users, "reconnect-tenant", pid)
80+
81+
metrics = Realtime.PromEx.get_metrics() |> IO.iodata_to_binary()
82+
assert String.contains?(metrics, "tenant=\"reconnect-tenant\"")
83+
84+
start_supervised!(
85+
{MetricsCleaner, [metrics_cleaner_schedule_timer_in_ms: 100, vacant_metric_threshold_in_seconds: 1]}
86+
)
87+
88+
# Disconnect
89+
Beacon.leave(:users, "reconnect-tenant", pid)
90+
Process.sleep(500)
91+
92+
# Reconnect before threshold
93+
pid2 = spawn_link(fn -> Process.sleep(:infinity) end)
94+
Beacon.join(:users, "reconnect-tenant", pid2)
95+
96+
# Wait for cleanup to run
97+
Process.sleep(2200)
98+
99+
# Metrics should still be present
100+
metrics = Realtime.PromEx.get_metrics() |> IO.iodata_to_binary()
101+
assert String.contains?(metrics, "tenant=\"reconnect-tenant\"")
102+
end
103+
end
104+
105+
describe "metrics cleanup - disconnected tenants" do
106+
test "cleans up metrics for tenants that have been unregistered" do
107+
:telemetry.execute(
108+
[:realtime, :connections],
109+
%{connected: 1, connected_cluster: 10, limit: 100},
110+
%{tenant: "connected-tenant"}
111+
)
112+
113+
:telemetry.execute(
114+
[:realtime, :connections],
115+
%{connected: 0, connected_cluster: 20, limit: 100},
116+
%{tenant: "disconnected-tenant1"}
117+
)
118+
119+
:telemetry.execute(
120+
[:realtime, :connections],
121+
%{connected: 0, connected_cluster: 20, limit: 100},
122+
%{tenant: "disconnected-tenant2"}
123+
)
124+
125+
metrics = Realtime.PromEx.get_metrics() |> IO.iodata_to_binary()
126+
127+
assert String.contains?(metrics, "tenant=\"connected-tenant\"")
128+
assert String.contains?(metrics, "tenant=\"disconnected-tenant1\"")
129+
assert String.contains?(metrics, "tenant=\"disconnected-tenant2\"")
130+
131+
start_supervised!(
132+
{MetricsCleaner, [metrics_cleaner_schedule_timer_in_ms: 100, vacant_metric_threshold_in_seconds: 1]}
133+
)
134+
135+
# Simulate tenant registration (connected)
136+
:telemetry.execute([:syn, Connect, :registered], %{}, %{name: "connected-tenant"})
137+
138+
# Simulate tenant unregistration (disconnected)
139+
:telemetry.execute([:syn, Connect, :unregistered], %{}, %{name: "disconnected-tenant1"})
140+
:telemetry.execute([:syn, Connect, :unregistered], %{}, %{name: "disconnected-tenant2"})
141+
142+
# Wait for clean up to run
143+
Process.sleep(200)
144+
145+
# Nothing changes yet (threshold not reached)
146+
metrics = Realtime.PromEx.get_metrics() |> IO.iodata_to_binary()
147+
148+
assert String.contains?(metrics, "tenant=\"connected-tenant\"")
149+
assert String.contains?(metrics, "tenant=\"disconnected-tenant1\"")
150+
assert String.contains?(metrics, "tenant=\"disconnected-tenant2\"")
151+
152+
# Wait for threshold to pass and cleanup to run
153+
Process.sleep(2200)
154+
155+
# disconnected tenant metrics are now gone
156+
metrics = Realtime.PromEx.get_metrics() |> IO.iodata_to_binary()
157+
158+
assert String.contains?(metrics, "tenant=\"connected-tenant\"")
159+
refute String.contains?(metrics, "tenant=\"disconnected-tenant1\"")
160+
refute String.contains?(metrics, "tenant=\"disconnected-tenant2\"")
161+
end
162+
163+
test "does not clean up metrics if tenant reconnects before threshold" do
164+
:telemetry.execute(
165+
[:realtime, :connections],
166+
%{connected: 1, connected_cluster: 10, limit: 100},
167+
%{tenant: "reconnect-tenant"}
168+
)
169+
170+
metrics = Realtime.PromEx.get_metrics() |> IO.iodata_to_binary()
171+
assert String.contains?(metrics, "tenant=\"reconnect-tenant\"")
172+
173+
start_supervised!(
174+
{MetricsCleaner, [metrics_cleaner_schedule_timer_in_ms: 100, vacant_metric_threshold_in_seconds: 1]}
175+
)
176+
177+
# Simulate tenant unregistration
178+
:telemetry.execute([:syn, Connect, :unregistered], %{}, %{name: "reconnect-tenant"})
179+
Process.sleep(500)
180+
181+
# Re-register before threshold
182+
:telemetry.execute([:syn, Connect, :registered], %{}, %{name: "reconnect-tenant"})
183+
184+
# Wait for cleanup to run
185+
Process.sleep(2200)
186+
187+
# Metrics should still be present
188+
metrics = Realtime.PromEx.get_metrics() |> IO.iodata_to_binary()
189+
assert String.contains?(metrics, "tenant=\"reconnect-tenant\"")
190+
end
68191
end
69192
end

test/realtime/syn_handler_test.exs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,11 +227,68 @@ defmodule Realtime.SynHandlerTest do
227227
end
228228
end
229229

230+
describe "on_process_registered/5" do
231+
test "emits telemetry event for process registration" do
232+
pid = self()
233+
meta = %{some: :meta}
234+
reason = :normal
235+
236+
# Attach a test handler to capture the telemetry event
237+
test_pid = self()
238+
handler_id = [:test, :syn_handler, :registered]
239+
240+
:telemetry.attach(
241+
handler_id,
242+
[:syn, @mod, :registered],
243+
fn event, measurements, metadata, _config ->
244+
send(test_pid, {:telemetry_event, event, measurements, metadata})
245+
end,
246+
nil
247+
)
248+
249+
on_exit(fn -> :telemetry.detach(handler_id) end)
250+
251+
assert SynHandler.on_process_registered(@mod, @name, pid, meta, reason) == :ok
252+
253+
assert_receive {:telemetry_event, [:syn, @mod, :registered], %{}, %{name: @name}}
254+
end
255+
end
256+
230257
describe "on_process_unregistered/5" do
231258
setup do
232259
RealtimeWeb.Endpoint.subscribe("#{@topic}:#{@name}")
233260
end
234261

262+
test "emits telemetry event for process unregistration" do
263+
reason = :normal
264+
pid = self()
265+
266+
# Attach a test handler to capture the telemetry event
267+
test_pid = self()
268+
handler_id = [:test, :syn_handler, :unregistered]
269+
270+
:telemetry.attach(
271+
handler_id,
272+
[:syn, @mod, :unregistered],
273+
fn event, measurements, metadata, _config ->
274+
send(test_pid, {:telemetry_event, event, measurements, metadata})
275+
end,
276+
nil
277+
)
278+
279+
on_exit(fn -> :telemetry.detach(handler_id) end)
280+
281+
capture_log(fn ->
282+
assert SynHandler.on_process_unregistered(@mod, @name, pid, %{}, reason) == :ok
283+
end)
284+
285+
assert_receive {:telemetry_event, [:syn, @mod, :unregistered], %{}, %{name: @name}}
286+
287+
topic = "#{@topic}:#{@name}"
288+
event = "#{@topic}_down"
289+
assert_receive %Phoenix.Socket.Broadcast{topic: ^topic, event: ^event, payload: %{reason: ^reason, pid: ^pid}}
290+
end
291+
235292
test "it handles :syn_conflict_resolution reason" do
236293
reason = :syn_conflict_resolution
237294
pid = self()

0 commit comments

Comments
 (0)