Skip to content

Commit ecac071

Browse files
authored
Fastlane for phoenix presence_diff (#1558)
It uses a fork of Phoenix for time being * fix: count presence_diff events on MessageDispatcher * fix: remove traces from console during development
1 parent 07de665 commit ecac071

File tree

8 files changed

+113
-71
lines changed

8 files changed

+113
-71
lines changed

config/dev.exs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ config :phoenix, :plug_init_mode, :runtime
9797
# Disable caching to ensure the rendered spec is refreshed
9898
config :open_api_spex, :cache_adapter, OpenApiSpex.Plug.NoneCache
9999

100-
config :opentelemetry, traces_exporter: {:otel_exporter_stdout, []}
100+
# Disabled but can print to stdout with:
101+
# config :opentelemetry, traces_exporter: {:otel_exporter_stdout, []}
102+
config :opentelemetry, traces_exporter: :none
101103

102104
config :mix_test_watch, clear: true

lib/realtime_web/channels/presence.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,6 @@ defmodule RealtimeWeb.Presence do
88
use Phoenix.Presence,
99
otp_app: :realtime,
1010
pubsub_server: Realtime.PubSub,
11+
dispatcher: RealtimeWeb.RealtimeChannel.MessageDispatcher,
1112
pool_size: 10
1213
end

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ defmodule RealtimeWeb.RealtimeChannel do
1818
alias Realtime.Tenants.Authorization
1919
alias Realtime.Tenants.Authorization.Policies
2020
alias Realtime.Tenants.Authorization.Policies.BroadcastPolicies
21-
alias Realtime.Tenants.Authorization.Policies.PresencePolicies
2221
alias Realtime.Tenants.Connect
2322

2423
alias RealtimeWeb.Channels.Payloads.Join
@@ -259,27 +258,11 @@ defmodule RealtimeWeb.RealtimeChannel do
259258
{:noreply, assign(socket, %{pg_sub_ref: pg_sub_ref})}
260259
end
261260

262-
def handle_info(
263-
%{event: "presence_diff"},
264-
%{assigns: %{policies: %Policies{presence: %PresencePolicies{read: false}}}} = socket
265-
) do
266-
Logger.warning("Presence message ignored")
267-
{:noreply, socket}
268-
end
269-
270261
def handle_info(_msg, %{assigns: %{policies: %Policies{broadcast: %BroadcastPolicies{read: false}}}} = socket) do
271262
Logger.warning("Broadcast message ignored")
272263
{:noreply, socket}
273264
end
274265

275-
def handle_info(%{event: "presence_diff", payload: payload} = msg, socket) do
276-
%{presence_rate_counter: presence_rate_counter} = socket.assigns
277-
GenCounter.add(presence_rate_counter.id)
278-
maybe_log_info(socket, msg)
279-
push(socket, "presence_diff", payload)
280-
{:noreply, socket}
281-
end
282-
283266
def handle_info(%{event: type, payload: payload} = msg, socket) do
284267
count(socket)
285268
maybe_log_info(socket, msg)

lib/realtime_web/channels/realtime_channel/message_dispatcher.ex

Lines changed: 40 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,63 +5,67 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do
55

66
require Logger
77

8-
def fastlane_metadata(fastlane_pid, serializer, topic, log_level, tenant_id, replayed_message_ids \\ MapSet.new())
9-
10-
def fastlane_metadata(fastlane_pid, serializer, topic, :info, tenant_id, replayed_message_ids) do
11-
{:rc_fastlane, fastlane_pid, serializer, topic, {:log, tenant_id}, replayed_message_ids}
12-
end
13-
14-
def fastlane_metadata(fastlane_pid, serializer, topic, _log_level, _tenant_id, replayed_message_ids) do
15-
{:rc_fastlane, fastlane_pid, serializer, topic, replayed_message_ids}
8+
def fastlane_metadata(fastlane_pid, serializer, topic, log_level, tenant_id, replayed_message_ids \\ MapSet.new()) do
9+
{:rc_fastlane, fastlane_pid, serializer, topic, log_level, tenant_id, replayed_message_ids}
1610
end
1711

1812
@doc """
1913
This dispatch function caches encoded messages if fastlane is used
2014
It also sends an :update_rate_counter to the subscriber and it can conditionally log
2115
"""
2216
@spec dispatch(list, pid, Phoenix.Socket.Broadcast.t()) :: :ok
23-
def dispatch(subscribers, from, %Phoenix.Socket.Broadcast{} = msg) do
17+
def dispatch(subscribers, from, %Phoenix.Socket.Broadcast{event: event} = msg) do
2418
# fastlane_pid is the actual socket transport pid
2519
# This reduce caches the serialization and bypasses the channel process going straight to the
2620
# transport process
2721

2822
message_id = message_id(msg.payload)
2923

30-
# Credo doesn't like that we don't use the result aggregation
31-
_ =
32-
Enum.reduce(subscribers, %{}, fn
33-
{pid, _}, cache when pid == from ->
34-
cache
24+
{_cache, count} =
25+
Enum.reduce(subscribers, {%{}, 0}, fn
26+
{pid, _}, {cache, count} when pid == from ->
27+
{cache, count}
3528

36-
{pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, replayed_message_ids}}, cache ->
29+
{pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, log_level, tenant_id, replayed_message_ids}},
30+
{cache, count} ->
3731
if already_replayed?(message_id, replayed_message_ids) do
3832
# skip already replayed message
39-
cache
33+
{cache, count}
4034
else
41-
send(pid, :update_rate_counter)
42-
do_dispatch(msg, fastlane_pid, serializer, join_topic, cache)
43-
end
35+
if event != "presence_diff", do: send(pid, :update_rate_counter)
4436

45-
{pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, {:log, tenant_id}, replayed_message_ids}}, cache ->
46-
if already_replayed?(message_id, replayed_message_ids) do
47-
# skip already replayed message
48-
cache
49-
else
50-
send(pid, :update_rate_counter)
51-
log = "Received message on #{join_topic} with payload: #{inspect(msg, pretty: true)}"
52-
Logger.info(log, external_id: tenant_id, project: tenant_id)
37+
maybe_log(log_level, join_topic, msg, tenant_id)
5338

54-
do_dispatch(msg, fastlane_pid, serializer, join_topic, cache)
39+
cache = do_dispatch(msg, fastlane_pid, serializer, join_topic, cache)
40+
{cache, count + 1}
5541
end
5642

57-
{pid, _}, cache ->
43+
{pid, _}, {cache, count} ->
5844
send(pid, msg)
59-
cache
45+
{cache, count}
6046
end)
6147

48+
tenant_id = tenant_id(subscribers)
49+
increment_presence_counter(tenant_id, event, count)
50+
6251
:ok
6352
end
6453

54+
defp increment_presence_counter(tenant_id, "presence_diff", count) when is_binary(tenant_id) do
55+
tenant_id
56+
|> Realtime.Tenants.presence_events_per_second_key()
57+
|> Realtime.GenCounter.add(count)
58+
end
59+
60+
defp increment_presence_counter(_tenant_id, _event, _count), do: :ok
61+
62+
defp maybe_log(:info, join_topic, msg, tenant_id) do
63+
log = "Received message on #{join_topic} with payload: #{inspect(msg, pretty: true)}"
64+
Logger.info(log, external_id: tenant_id, project: tenant_id)
65+
end
66+
67+
defp maybe_log(_level, _join_topic, _msg, _tenant_id), do: :ok
68+
6569
defp message_id(%{"meta" => %{"id" => id}}), do: id
6670
defp message_id(_), do: nil
6771

@@ -82,4 +86,10 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do
8286
Map.put(cache, serializer, encoded_msg)
8387
end
8488
end
89+
90+
defp tenant_id([{_pid, {:rc_fastlane, _, _, _, _, tenant_id, _}} | _]) do
91+
tenant_id
92+
end
93+
94+
defp tenant_id(_), do: nil
8595
end

mix.exs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.52.1",
7+
version: "2.53.0",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,
@@ -53,7 +53,7 @@ defmodule Realtime.MixProject do
5353
# Type `mix help deps` for examples and options.
5454
defp deps do
5555
[
56-
{:phoenix, "~> 1.7.0"},
56+
{:phoenix, override: true, github: "supabase/phoenix", branch: "feat/presence-custom-dispatcher-1.7.19"},
5757
{:phoenix_ecto, "~> 4.4.0"},
5858
{:ecto_sql, "~> 3.11"},
5959
{:ecto_psql_extras, "~> 0.8"},

mix.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
"opentelemetry_semantic_conventions": {:hex, :opentelemetry_semantic_conventions, "1.27.0", "acd0194a94a1e57d63da982ee9f4a9f88834ae0b31b0bd850815fe9be4bbb45f", [:mix, :rebar3], [], "hexpm", "9681ccaa24fd3d810b4461581717661fd85ff7019b082c2dff89c7d5b1fc2864"},
6767
"opentelemetry_telemetry": {:hex, :opentelemetry_telemetry, "1.1.2", "410ab4d76b0921f42dbccbe5a7c831b8125282850be649ee1f70050d3961118a", [:mix, :rebar3], [{:opentelemetry_api, "~> 1.3", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "641ab469deb181957ac6d59bce6e1321d5fe2a56df444fc9c19afcad623ab253"},
6868
"otel_http": {:hex, :otel_http, "0.2.0", "b17385986c7f1b862f5d577f72614ecaa29de40392b7618869999326b9a61d8a", [:rebar3], [], "hexpm", "f2beadf922c8cfeb0965488dd736c95cc6ea8b9efce89466b3904d317d7cc717"},
69-
"phoenix": {:hex, :phoenix, "1.7.19", "36617efe5afbd821099a8b994ff4618a340a5bfb25531a1802c4d4c634017a57", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "ba4dc14458278773f905f8ae6c2ec743d52c3a35b6b353733f64f02dfe096cd6"},
69+
"phoenix": {:git, "https://github.com/supabase/phoenix.git", "7b884cc0cc1a49ad2bc272acda2e622b3e11c139", [branch: "feat/presence-custom-dispatcher-1.7.19"]},
7070
"phoenix_ecto": {:hex, :phoenix_ecto, "4.4.3", "86e9878f833829c3f66da03d75254c155d91d72a201eb56ae83482328dc7ca93", [:mix], [{:ecto, "~> 3.5", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:plug, "~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "d36c401206f3011fefd63d04e8ef626ec8791975d9d107f9a0817d426f61ac07"},
7171
"phoenix_html": {:hex, :phoenix_html, "3.3.4", "42a09fc443bbc1da37e372a5c8e6755d046f22b9b11343bf885067357da21cb3", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "0249d3abec3714aff3415e7ee3d9786cb325be3151e6c4b3021502c585bf53fb"},
7272
"phoenix_live_dashboard": {:hex, :phoenix_live_dashboard, "0.8.6", "7b1f0327f54c9eb69845fd09a77accf922f488c549a7e7b8618775eb603a62c7", [:mix], [{:ecto, "~> 3.6.2 or ~> 3.7", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_mysql_extras, "~> 0.5", [hex: :ecto_mysql_extras, repo: "hexpm", optional: true]}, {:ecto_psql_extras, "~> 0.7", [hex: :ecto_psql_extras, repo: "hexpm", optional: true]}, {:ecto_sqlite3_extras, "~> 1.1.7 or ~> 1.2.0", [hex: :ecto_sqlite3_extras, repo: "hexpm", optional: true]}, {:mime, "~> 1.6 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:phoenix_live_view, "~> 0.19 or ~> 1.0", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6 or ~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "1681ab813ec26ca6915beb3414aa138f298e17721dc6a2bde9e6eb8a62360ff6"},

test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,24 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do
1616
describe "fastlane_metadata/5" do
1717
test "info level" do
1818
assert MessageDispatcher.fastlane_metadata(self(), Serializer, "realtime:topic", :info, "tenant_id") ==
19-
{:rc_fastlane, self(), Serializer, "realtime:topic", {:log, "tenant_id"}, MapSet.new()}
19+
{:rc_fastlane, self(), Serializer, "realtime:topic", :info, "tenant_id", MapSet.new()}
2020
end
2121

2222
test "non-info level" do
2323
assert MessageDispatcher.fastlane_metadata(self(), Serializer, "realtime:topic", :warning, "tenant_id") ==
24-
{:rc_fastlane, self(), Serializer, "realtime:topic", MapSet.new()}
24+
{:rc_fastlane, self(), Serializer, "realtime:topic", :warning, "tenant_id", MapSet.new()}
25+
end
26+
27+
test "replayed message ids" do
28+
assert MessageDispatcher.fastlane_metadata(
29+
self(),
30+
Serializer,
31+
"realtime:topic",
32+
:warning,
33+
"tenant_id",
34+
MapSet.new([1])
35+
) ==
36+
{:rc_fastlane, self(), Serializer, "realtime:topic", :warning, "tenant_id", MapSet.new([1])}
2537
end
2638
end
2739

@@ -50,8 +62,8 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do
5062
from_pid = :erlang.list_to_pid(~c'<0.2.1>')
5163

5264
subscribers = [
53-
{subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", {:log, "tenant123"}, MapSet.new()}},
54-
{subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", MapSet.new()}}
65+
{subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :info, "tenant123", MapSet.new()}},
66+
{subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :warning, "tenant123", MapSet.new()}}
5567
]
5668

5769
msg = %Broadcast{topic: "some:other:topic", event: "event", payload: %{data: "test"}}
@@ -74,6 +86,48 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do
7486
refute_receive _any
7587
end
7688

89+
test "dispatches 'presence_diff' messages to fastlane subscribers" do
90+
parent = self()
91+
92+
subscriber_pid =
93+
spawn(fn ->
94+
loop = fn loop ->
95+
receive do
96+
msg ->
97+
send(parent, {:subscriber, msg})
98+
loop.(loop)
99+
end
100+
end
101+
102+
loop.(loop)
103+
end)
104+
105+
from_pid = :erlang.list_to_pid(~c'<0.2.1>')
106+
107+
subscribers = [
108+
{subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :info, "tenant456", MapSet.new()}},
109+
{subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :warning, "tenant456", MapSet.new()}}
110+
]
111+
112+
msg = %Broadcast{topic: "some:other:topic", event: "presence_diff", payload: %{data: "test"}}
113+
114+
log =
115+
capture_log(fn ->
116+
assert MessageDispatcher.dispatch(subscribers, from_pid, msg) == :ok
117+
end)
118+
119+
assert log =~ "Received message on realtime:topic with payload: #{inspect(msg, pretty: true)}"
120+
121+
assert_receive {:encoded, %Broadcast{event: "presence_diff", payload: %{data: "test"}, topic: "realtime:topic"}}
122+
assert_receive {:encoded, %Broadcast{event: "presence_diff", payload: %{data: "test"}, topic: "realtime:topic"}}
123+
124+
assert Agent.get(TestSerializer, & &1) == 1
125+
126+
assert Realtime.GenCounter.get(Realtime.Tenants.presence_events_per_second_key("tenant456")) == 2
127+
128+
refute_receive _any
129+
end
130+
77131
test "does not dispatch messages to fastlane subscribers if they already replayed it" do
78132
parent = self()
79133

@@ -95,8 +149,9 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do
95149

96150
subscribers = [
97151
{subscriber_pid,
98-
{:rc_fastlane, self(), TestSerializer, "realtime:topic", {:log, "tenant123"}, replaeyd_message_ids}},
99-
{subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", replaeyd_message_ids}}
152+
{:rc_fastlane, self(), TestSerializer, "realtime:topic", :info, "tenant123", replaeyd_message_ids}},
153+
{subscriber_pid,
154+
{:rc_fastlane, self(), TestSerializer, "realtime:topic", :warning, "tenant123", replaeyd_message_ids}}
100155
]
101156

102157
msg = %Broadcast{
@@ -131,8 +186,8 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do
131186
from_pid = :erlang.list_to_pid(~c'<0.2.1>')
132187

133188
subscribers = [
134-
{subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", {:log, "tenant123"}, MapSet.new()}},
135-
{subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", MapSet.new()}}
189+
{subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :info, "tenant123", MapSet.new()}},
190+
{subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :warning, "tenant123", MapSet.new()}}
136191
]
137192

138193
msg = %Broadcast{topic: "some:other:topic", event: "event", payload: "not a map"}

test/realtime_web/channels/realtime_channel_test.exs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -239,23 +239,14 @@ defmodule RealtimeWeb.RealtimeChannelTest do
239239
end
240240

241241
describe "presence" do
242-
test "events are counted", %{tenant: tenant} do
242+
test "presence state event is counted", %{tenant: tenant} do
243243
jwt = Generators.generate_jwt_token(tenant)
244244
{:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt))
245245

246246
assert {:ok, _, %Socket{} = socket} = subscribe_and_join(socket, "realtime:test", %{})
247247

248-
presence_diff = %Socket.Broadcast{event: "presence_diff", payload: %{joins: %{}, leaves: %{}}}
249-
send(socket.channel_pid, presence_diff)
250-
251248
assert_receive %Socket.Message{topic: "realtime:test", event: "presence_state", payload: %{}}
252249

253-
assert_receive %Socket.Message{
254-
topic: "realtime:test",
255-
event: "presence_diff",
256-
payload: %{joins: %{}, leaves: %{}}
257-
}
258-
259250
tenant_id = tenant.external_id
260251

261252
# Wait for RateCounter to tick
@@ -264,8 +255,8 @@ defmodule RealtimeWeb.RealtimeChannelTest do
264255
assert {:ok, %RateCounter{id: {:channel, :presence_events, ^tenant_id}, bucket: bucket}} =
265256
RateCounter.get(socket.assigns.presence_rate_counter)
266257

267-
# presence_state + presence_diff
268-
assert 2 in bucket
258+
# presence_state
259+
assert Enum.sum(bucket) == 1
269260
end
270261
end
271262

0 commit comments

Comments
 (0)