Skip to content

Commit 16bd44d

Browse files
authored
feat: presence payload size (#1559)
* Also tweak buckets to account all the way to 3000KB * Start tagging the payload size metrics with message_type. message_type can be presence, broadcast or postgres_changes
1 parent e9eaf9f commit 16bd44d

File tree

12 files changed

+149
-47
lines changed

12 files changed

+149
-47
lines changed

lib/extensions/postgres_cdc_rls/replication_poller.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
183183
change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do
184184
topic = "realtime:postgres:" <> tenant_id
185185

186-
RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher)
186+
RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher, :postgres_changes)
187187
end
188188

189189
{:ok, rows_count}

lib/realtime/monitoring/prom_ex/plugins/tenant.ex

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,20 +36,21 @@ defmodule Realtime.PromEx.Plugins.Tenant do
3636
event_name: [:realtime, :tenants, :payload, :size],
3737
measurement: :size,
3838
description: "Tenant payload size",
39-
tags: [:tenant],
39+
tags: [:tenant, :message_type],
4040
unit: :byte,
4141
reporter_options: [
42-
buckets: [100, 250, 500, 1000, 2000, 3000, 5000, 10_000, 25_000]
42+
buckets: [250, 500, 1000, 3000, 5000, 10_000, 25_000, 100_000, 500_000, 1_000_000, 3_000_000]
4343
]
4444
),
4545
distribution(
4646
[:realtime, :payload, :size],
4747
event_name: [:realtime, :tenants, :payload, :size],
4848
measurement: :size,
4949
description: "Payload size",
50+
tags: [:message_type],
5051
unit: :byte,
5152
reporter_options: [
52-
buckets: [100, 250, 500, 1000, 2000, 3000, 5000, 10_000, 25_000]
53+
buckets: [250, 500, 1000, 3000, 5000, 10_000, 25_000, 100_000, 500_000, 1_000_000, 3_000_000]
5354
]
5455
)
5556
]

lib/realtime/tenants/batch_broadcast.ex

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,14 @@ defmodule Realtime.Tenants.BatchBroadcast do
129129
broadcast = %Phoenix.Socket.Broadcast{topic: message.topic, event: @event_type, payload: payload}
130130

131131
GenCounter.add(events_per_second_rate.id)
132-
TenantBroadcaster.pubsub_broadcast(tenant.external_id, tenant_topic, broadcast, RealtimeChannel.MessageDispatcher)
132+
133+
TenantBroadcaster.pubsub_broadcast(
134+
tenant.external_id,
135+
tenant_topic,
136+
broadcast,
137+
RealtimeChannel.MessageDispatcher,
138+
:broadcast
139+
)
133140
end
134141

135142
defp permissions_for_message(_, nil, _), do: nil

lib/realtime_web/channels/realtime_channel/broadcast_handler.ex

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,21 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
7676
broadcast = %Phoenix.Socket.Broadcast{topic: tenant_topic, event: @event_type, payload: payload}
7777

7878
if self_broadcast do
79-
TenantBroadcaster.pubsub_broadcast(tenant_id, tenant_topic, broadcast, RealtimeChannel.MessageDispatcher)
79+
TenantBroadcaster.pubsub_broadcast(
80+
tenant_id,
81+
tenant_topic,
82+
broadcast,
83+
RealtimeChannel.MessageDispatcher,
84+
:broadcast
85+
)
8086
else
8187
TenantBroadcaster.pubsub_broadcast_from(
8288
tenant_id,
8389
self(),
8490
tenant_topic,
8591
broadcast,
86-
RealtimeChannel.MessageDispatcher
92+
RealtimeChannel.MessageDispatcher,
93+
:broadcast
8794
)
8895
end
8996
end

lib/realtime_web/channels/realtime_channel/presence_handler.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do
109109

110110
%{assigns: %{presence_key: presence_key, tenant_topic: tenant_topic}} = socket
111111
payload = Map.get(payload, "payload", %{})
112+
RealtimeWeb.TenantBroadcaster.collect_payload_size(socket.assigns.tenant, payload, :presence)
112113

113114
with :ok <- limit_presence_event(socket),
114115
{:ok, _} <- Presence.track(self(), tenant_topic, presence_key, payload) do

lib/realtime_web/tenant_broadcaster.ex

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@ defmodule RealtimeWeb.TenantBroadcaster do
55

66
alias Phoenix.PubSub
77

8-
@spec pubsub_broadcast(tenant_id :: String.t(), PubSub.topic(), PubSub.message(), PubSub.dispatcher()) :: :ok
9-
def pubsub_broadcast(tenant_id, topic, message, dispatcher) do
10-
collect_payload_size(tenant_id, message)
8+
@type message_type :: :broadcast | :presence | :postgres_changes
9+
10+
@spec pubsub_broadcast(tenant_id :: String.t(), PubSub.topic(), PubSub.message(), PubSub.dispatcher(), message_type) ::
11+
:ok
12+
def pubsub_broadcast(tenant_id, topic, message, dispatcher, message_type) do
13+
collect_payload_size(tenant_id, message, message_type)
1114

1215
if pubsub_adapter() == :gen_rpc do
1316
PubSub.broadcast(Realtime.PubSub, topic, message, dispatcher)
@@ -23,11 +26,12 @@ defmodule RealtimeWeb.TenantBroadcaster do
2326
from :: pid,
2427
PubSub.topic(),
2528
PubSub.message(),
26-
PubSub.dispatcher()
29+
PubSub.dispatcher(),
30+
message_type
2731
) ::
2832
:ok
29-
def pubsub_broadcast_from(tenant_id, from, topic, message, dispatcher) do
30-
collect_payload_size(tenant_id, message)
33+
def pubsub_broadcast_from(tenant_id, from, topic, message, dispatcher, message_type) do
34+
collect_payload_size(tenant_id, message, message_type)
3135

3236
if pubsub_adapter() == :gen_rpc do
3337
PubSub.broadcast_from(Realtime.PubSub, from, topic, message, dispatcher)
@@ -45,16 +49,18 @@ defmodule RealtimeWeb.TenantBroadcaster do
4549

4650
@payload_size_event [:realtime, :tenants, :payload, :size]
4751

48-
defp collect_payload_size(tenant_id, payload) when is_struct(payload) do
52+
@spec collect_payload_size(tenant_id :: String.t(), payload :: term, message_type :: message_type) :: :ok
53+
def collect_payload_size(tenant_id, payload, message_type) when is_struct(payload) do
4954
# Extracting from struct so the __struct__ bit is not calculated as part of the payload
50-
collect_payload_size(tenant_id, Map.from_struct(payload))
55+
collect_payload_size(tenant_id, Map.from_struct(payload), message_type)
5156
end
5257

53-
defp collect_payload_size(tenant_id, payload) do
54-
:telemetry.execute(@payload_size_event, %{size: :erlang.external_size(payload)}, %{tenant: tenant_id})
58+
def collect_payload_size(tenant_id, payload, message_type) do
59+
:telemetry.execute(@payload_size_event, %{size: :erlang.external_size(payload)}, %{
60+
tenant: tenant_id,
61+
message_type: message_type
62+
})
5563
end
5664

57-
defp pubsub_adapter do
58-
Application.fetch_env!(:realtime, :pubsub_adapter)
59-
end
65+
defp pubsub_adapter, do: Application.fetch_env!(:realtime, :pubsub_adapter)
6066
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.51.15",
7+
version: "2.52.0",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/realtime/extensions/cdc_rls/cdc_rls_test.exs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,15 @@ defmodule Realtime.Extensions.CdcRlsTest do
236236

237237
RateCounter.stop(tenant.external_id)
238238

239+
on_exit(fn -> :telemetry.detach(__MODULE__) end)
240+
241+
:telemetry.attach(
242+
__MODULE__,
243+
[:realtime, :tenants, :payload, :size],
244+
&__MODULE__.handle_telemetry/4,
245+
pid: self()
246+
)
247+
239248
%{tenant: tenant, conn: conn}
240249
end
241250

@@ -317,6 +326,13 @@ defmodule Realtime.Extensions.CdcRlsTest do
317326

318327
assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} = RateCounter.get(rate)
319328
assert 1 in bucket
329+
330+
assert_receive {
331+
:telemetry,
332+
[:realtime, :tenants, :payload, :size],
333+
%{size: 341},
334+
%{tenant: "dev_tenant", message_type: :postgres_changes}
335+
}
320336
end
321337

322338
@aux_mod (quote do
@@ -414,4 +430,6 @@ defmodule Realtime.Extensions.CdcRlsTest do
414430
:erpc.call(node, PostgresCdcRls, :handle_stop, [tenant.external_id, 10_000])
415431
end
416432
end
433+
434+
def handle_telemetry(event, measures, metadata, pid: pid), do: send(pid, {:telemetry, event, measures, metadata})
417435
end

test/realtime/monitoring/prom_ex/plugins/tenant_test.exs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -262,36 +262,36 @@ defmodule Realtime.PromEx.Plugins.TenantTest do
262262
external_id = context.tenant.external_id
263263

264264
pattern =
265-
~r/realtime_tenants_payload_size_count{tenant="#{external_id}"}\s(?<number>\d+)/
265+
~r/realtime_tenants_payload_size_count{message_type=\"presence\",tenant="#{external_id}"}\s(?<number>\d+)/
266266

267267
metric_value = metric_value(pattern)
268268

269269
message = %{topic: "a topic", event: "an event", payload: ["a", %{"b" => "c"}, 1, 23]}
270-
RealtimeWeb.TenantBroadcaster.pubsub_broadcast(external_id, "a topic", message, Phoenix.PubSub)
270+
RealtimeWeb.TenantBroadcaster.pubsub_broadcast(external_id, "a topic", message, Phoenix.PubSub, :presence)
271271

272272
Process.sleep(200)
273273
assert metric_value(pattern) == metric_value + 1
274274

275275
bucket_pattern =
276-
~r/realtime_tenants_payload_size_bucket{tenant="#{external_id}",le="100"}\s(?<number>\d+)/
276+
~r/realtime_tenants_payload_size_bucket{message_type=\"presence\",tenant="#{external_id}",le="250"}\s(?<number>\d+)/
277277

278278
assert metric_value(bucket_pattern) > 0
279279
end
280280

281281
test "global metric payload size", context do
282282
external_id = context.tenant.external_id
283283

284-
pattern = ~r/realtime_payload_size_count\s(?<number>\d+)/
284+
pattern = ~r/realtime_payload_size_count{message_type=\"broadcast\"}\s(?<number>\d+)/
285285

286286
metric_value = metric_value(pattern)
287287

288288
message = %{topic: "a topic", event: "an event", payload: ["a", %{"b" => "c"}, 1, 23]}
289-
RealtimeWeb.TenantBroadcaster.pubsub_broadcast(external_id, "a topic", message, Phoenix.PubSub)
289+
RealtimeWeb.TenantBroadcaster.pubsub_broadcast(external_id, "a topic", message, Phoenix.PubSub, :broadcast)
290290

291291
Process.sleep(200)
292292
assert metric_value(pattern) == metric_value + 1
293293

294-
bucket_pattern = ~r/realtime_payload_size_bucket{le="100"}\s(?<number>\d+)/
294+
bucket_pattern = ~r/realtime_payload_size_bucket{message_type=\"broadcast\",le="250"}\s(?<number>\d+)/
295295

296296
assert metric_value(bucket_pattern) > 0
297297
end

test/realtime_web/channels/realtime_channel/presence_handler_test.exs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,25 +100,41 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
100100
end
101101

102102
describe "handle/3" do
103+
setup do
104+
on_exit(fn -> :telemetry.detach(__MODULE__) end)
105+
106+
:telemetry.attach(
107+
__MODULE__,
108+
[:realtime, :tenants, :payload, :size],
109+
&__MODULE__.handle_telemetry/4,
110+
pid: self()
111+
)
112+
end
113+
103114
test "with true policy and is private, user can track their presence and changes", %{
104115
tenant: tenant,
105116
topic: topic,
106117
db_conn: db_conn
107118
} do
119+
external_id = tenant.external_id
108120
key = random_string()
109121
policies = %Policies{presence: %PresencePolicies{read: true, write: true}}
110122

111123
socket =
112124
socket_fixture(tenant, topic, key, policies: policies)
113125

114-
PresenceHandler.handle(%{"event" => "track"}, db_conn, socket)
126+
PresenceHandler.handle(%{"event" => "track", "payload" => %{"A" => "b", "c" => "b"}}, db_conn, socket)
115127
topic = socket.assigns.tenant_topic
116128

117129
assert_receive %Broadcast{topic: ^topic, event: "presence_diff", payload: %{joins: joins, leaves: %{}}}
118130
assert Map.has_key?(joins, key)
131+
132+
assert_receive {:telemetry, [:realtime, :tenants, :payload, :size], %{size: 30},
133+
%{tenant: ^external_id, message_type: :presence}}
119134
end
120135

121136
test "when tracking already existing user, metadata updated", %{tenant: tenant, topic: topic, db_conn: db_conn} do
137+
external_id = tenant.external_id
122138
key = random_string()
123139
policies = %Policies{presence: %PresencePolicies{read: true, write: true}}
124140
socket = socket_fixture(tenant, topic, key, policies: policies)
@@ -134,10 +150,18 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
134150

135151
assert_receive %Broadcast{topic: ^topic, event: "presence_diff", payload: %{joins: joins, leaves: %{}}}
136152
assert Map.has_key?(joins, key)
153+
154+
assert_receive {:telemetry, [:realtime, :tenants, :payload, :size], %{size: 6},
155+
%{tenant: ^external_id, message_type: :presence}}
156+
157+
assert_receive {:telemetry, [:realtime, :tenants, :payload, :size], %{size: 55},
158+
%{tenant: ^external_id, message_type: :presence}}
159+
137160
refute_receive :_
138161
end
139162

140163
test "with false policy and is public, user can track their presence and changes", %{tenant: tenant, topic: topic} do
164+
external_id = tenant.external_id
141165
key = random_string()
142166
policies = %Policies{presence: %PresencePolicies{read: false, write: false}}
143167
socket = socket_fixture(tenant, topic, key, policies: policies, private?: false)
@@ -147,6 +171,9 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
147171
topic = socket.assigns.tenant_topic
148172
assert_receive %Broadcast{topic: ^topic, event: "presence_diff", payload: %{joins: joins, leaves: %{}}}
149173
assert Map.has_key?(joins, key)
174+
175+
assert_receive {:telemetry, [:realtime, :tenants, :payload, :size], %{size: 6},
176+
%{tenant: ^external_id, message_type: :presence}}
150177
end
151178

152179
test "user can untrack when they want", %{tenant: tenant, topic: topic, db_conn: db_conn} do
@@ -518,4 +545,6 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
518545
}
519546
}
520547
end
548+
549+
def handle_telemetry(event, measures, metadata, pid: pid), do: send(pid, {:telemetry, event, measures, metadata})
521550
end

0 commit comments

Comments
 (0)