Skip to content

Commit 54cd3f7

Browse files
authored
fix: make pubsub adapter configurable (#1539)
1 parent 9a21897 commit 54cd3f7

File tree

5 files changed

+108
-68
lines changed

5 files changed

+108
-68
lines changed

config/runtime.exs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ janitor_children_timeout = Env.get_integer("JANITOR_CHILDREN_TIMEOUT", :timer.se
6868
janitor_schedule_timer = Env.get_integer("JANITOR_SCHEDULE_TIMER_IN_MS", :timer.hours(4))
6969
platform = if System.get_env("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE", do: :aws, else: :fly
7070
broadcast_pool_size = Env.get_integer("BROADCAST_POOL_SIZE", 10)
71+
pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "pg2") |> String.to_atom()
7172
websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_000), :erlang.system_info(:wordsize))
7273

7374
no_channel_timeout_in_ms =
@@ -124,6 +125,7 @@ config :realtime,
124125
max_gen_rpc_clients: max_gen_rpc_clients,
125126
no_channel_timeout_in_ms: no_channel_timeout_in_ms,
126127
platform: platform,
128+
pubsub_adapter: pubsub_adapter,
127129
broadcast_pool_size: broadcast_pool_size
128130

129131
if config_env() != :test && run_janitor? do

lib/realtime/application.ex

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ defmodule Realtime.Application do
6767
RealtimeWeb.Telemetry,
6868
{Cluster.Supervisor, [topologies, [name: Realtime.ClusterSupervisor]]},
6969
{Phoenix.PubSub,
70-
name: Realtime.PubSub, pool_size: 10, adapter: Realtime.GenRpcPubSub, broadcast_pool_size: broadcast_pool_size},
70+
name: Realtime.PubSub, pool_size: 10, adapter: pubsub_adapter(), broadcast_pool_size: broadcast_pool_size},
7171
{Cachex, name: Realtime.RateCounter},
7272
Realtime.Tenants.Cache,
7373
Realtime.RateCounter.DynamicSupervisor,
@@ -154,4 +154,12 @@ defmodule Realtime.Application do
154154
OpentelemetryPhoenix.setup(adapter: :cowboy2)
155155
OpentelemetryEcto.setup([:realtime, :repo], db_statement: :enabled)
156156
end
157+
158+
defp pubsub_adapter do
159+
if Application.fetch_env!(:realtime, :pubsub_adapter) == :gen_rpc do
160+
Realtime.GenRpcPubSub
161+
else
162+
Phoenix.PubSub.PG2
163+
end
164+
end
157165
end

lib/realtime_web/tenant_broadcaster.ex

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@ defmodule RealtimeWeb.TenantBroadcaster do
99
def pubsub_broadcast(tenant_id, topic, message, dispatcher) do
1010
collect_payload_size(tenant_id, message)
1111

12-
PubSub.broadcast(Realtime.PubSub, topic, message, dispatcher)
12+
if pubsub_adapter() == :gen_rpc do
13+
PubSub.broadcast(Realtime.PubSub, topic, message, dispatcher)
14+
else
15+
Realtime.GenRpc.multicast(PubSub, :local_broadcast, [Realtime.PubSub, topic, message, dispatcher], key: topic)
16+
end
1317

1418
:ok
1519
end
@@ -25,7 +29,17 @@ defmodule RealtimeWeb.TenantBroadcaster do
2529
def pubsub_broadcast_from(tenant_id, from, topic, message, dispatcher) do
2630
collect_payload_size(tenant_id, message)
2731

28-
PubSub.broadcast_from(Realtime.PubSub, from, topic, message, dispatcher)
32+
if pubsub_adapter() == :gen_rpc do
33+
PubSub.broadcast_from(Realtime.PubSub, from, topic, message, dispatcher)
34+
else
35+
Realtime.GenRpc.multicast(
36+
PubSub,
37+
:local_broadcast_from,
38+
[Realtime.PubSub, from, topic, message, dispatcher],
39+
key: topic
40+
)
41+
end
42+
2943
:ok
3044
end
3145

@@ -39,4 +53,8 @@ defmodule RealtimeWeb.TenantBroadcaster do
3953
defp collect_payload_size(tenant_id, payload) do
4054
:telemetry.execute(@payload_size_event, %{size: :erlang.external_size(payload)}, %{tenant: tenant_id})
4155
end
56+
57+
defp pubsub_adapter do
58+
Application.fetch_env!(:realtime, :pubsub_adapter)
59+
end
4260
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.51.2",
7+
version: "2.51.3",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/realtime_web/tenant_broadcaster_test.exs

Lines changed: 76 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
defmodule RealtimeWeb.TenantBroadcasterTest do
2-
# Usage of Clustered
2+
# Usage of Clustered and changing Application env
33
use Realtime.DataCase, async: false
44

55
alias Phoenix.Socket.Broadcast
@@ -47,95 +47,107 @@ defmodule RealtimeWeb.TenantBroadcasterTest do
4747
pid: self()
4848
)
4949

50+
original = Application.fetch_env!(:realtime, :pubsub_adapter)
51+
on_exit(fn -> Application.put_env(:realtime, :pubsub_adapter, original) end)
52+
Application.put_env(:realtime, :pubsub_adapter, context.pubsub_adapter)
53+
5054
:ok
5155
end
5256

53-
describe "pubsub_broadcast/4" do
54-
test "pubsub_broadcast", %{node: node} do
55-
message = %Broadcast{topic: @topic, event: "an event", payload: %{"a" => "b"}}
56-
TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub)
57+
for pubsub_adapter <- [:gen_rpc, :pg2] do
58+
describe "pubsub_broadcast/4 #{pubsub_adapter}" do
59+
@describetag pubsub_adapter: pubsub_adapter
5760

58-
assert_receive ^message
61+
test "pubsub_broadcast", %{node: node} do
62+
message = %Broadcast{topic: @topic, event: "an event", payload: %{"a" => "b"}}
63+
TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub)
5964

60-
# Remote node received the broadcast
61-
assert_receive {:relay, ^node, ^message}
65+
assert_receive ^message
6266

63-
assert_receive {
64-
:telemetry,
65-
[:realtime, :tenants, :payload, :size],
66-
%{size: 114},
67-
%{tenant: "realtime-dev"}
68-
}
69-
end
67+
# Remote node received the broadcast
68+
assert_receive {:relay, ^node, ^message}
7069

71-
test "pubsub_broadcast list payload", %{node: node} do
72-
message = %Broadcast{topic: @topic, event: "an event", payload: ["a", %{"b" => "c"}, 1, 23]}
73-
TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub)
70+
assert_receive {
71+
:telemetry,
72+
[:realtime, :tenants, :payload, :size],
73+
%{size: 114},
74+
%{tenant: "realtime-dev"}
75+
}
76+
end
7477

75-
assert_receive ^message
78+
test "pubsub_broadcast list payload", %{node: node} do
79+
message = %Broadcast{topic: @topic, event: "an event", payload: ["a", %{"b" => "c"}, 1, 23]}
80+
TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub)
7681

77-
# Remote node received the broadcast
78-
assert_receive {:relay, ^node, ^message}
82+
assert_receive ^message
7983

80-
assert_receive {
81-
:telemetry,
82-
[:realtime, :tenants, :payload, :size],
83-
%{size: 130},
84-
%{tenant: "realtime-dev"}
85-
}
86-
end
84+
# Remote node received the broadcast
85+
assert_receive {:relay, ^node, ^message}
8786

88-
test "pubsub_broadcast string payload", %{node: node} do
89-
message = %Broadcast{topic: @topic, event: "an event", payload: "some text payload"}
90-
TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub)
87+
assert_receive {
88+
:telemetry,
89+
[:realtime, :tenants, :payload, :size],
90+
%{size: 130},
91+
%{tenant: "realtime-dev"}
92+
}
93+
end
9194

92-
assert_receive ^message
95+
test "pubsub_broadcast string payload", %{node: node} do
96+
message = %Broadcast{topic: @topic, event: "an event", payload: "some text payload"}
97+
TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub)
9398

94-
# Remote node received the broadcast
95-
assert_receive {:relay, ^node, ^message}
99+
assert_receive ^message
96100

97-
assert_receive {
98-
:telemetry,
99-
[:realtime, :tenants, :payload, :size],
100-
%{size: 119},
101-
%{tenant: "realtime-dev"}
102-
}
101+
# Remote node received the broadcast
102+
assert_receive {:relay, ^node, ^message}
103+
104+
assert_receive {
105+
:telemetry,
106+
[:realtime, :tenants, :payload, :size],
107+
%{size: 119},
108+
%{tenant: "realtime-dev"}
109+
}
110+
end
103111
end
104112
end
105113

106-
describe "pubsub_broadcast_from/5" do
107-
test "pubsub_broadcast_from", %{node: node} do
108-
parent = self()
114+
for pubsub_adapter <- [:gen_rpc, :pg2] do
115+
describe "pubsub_broadcast_from/5 #{pubsub_adapter}" do
116+
@describetag pubsub_adapter: pubsub_adapter
117+
118+
test "pubsub_broadcast_from", %{node: node} do
119+
parent = self()
109120

110-
spawn_link(fn ->
111-
Endpoint.subscribe(@topic)
112-
send(parent, :ready)
121+
spawn_link(fn ->
122+
Endpoint.subscribe(@topic)
123+
send(parent, :ready)
113124

114-
receive do
115-
msg -> send(parent, {:other_process, msg})
116-
end
117-
end)
125+
receive do
126+
msg -> send(parent, {:other_process, msg})
127+
end
128+
end)
118129

119-
assert_receive :ready
130+
assert_receive :ready
120131

121-
message = %Broadcast{topic: @topic, event: "an event", payload: %{"a" => "b"}}
132+
message = %Broadcast{topic: @topic, event: "an event", payload: %{"a" => "b"}}
122133

123-
TenantBroadcaster.pubsub_broadcast_from("realtime-dev", self(), @topic, message, Phoenix.PubSub)
134+
TenantBroadcaster.pubsub_broadcast_from("realtime-dev", self(), @topic, message, Phoenix.PubSub)
124135

125-
assert_receive {:other_process, ^message}
136+
assert_receive {:other_process, ^message}
126137

127-
# Remote node received the broadcast
128-
assert_receive {:relay, ^node, ^message}
138+
# Remote node received the broadcast
139+
assert_receive {:relay, ^node, ^message}
129140

130-
assert_receive {
131-
:telemetry,
132-
[:realtime, :tenants, :payload, :size],
133-
%{size: 114},
134-
%{tenant: "realtime-dev"}
135-
}
141+
assert_receive {
142+
:telemetry,
143+
[:realtime, :tenants, :payload, :size],
144+
%{size: 114},
145+
%{tenant: "realtime-dev"}
146+
}
136147

137-
# This process does not receive the message
138-
refute_receive _any
148+
# This process does not receive the message
149+
refute_receive _any
150+
end
139151
end
140152
end
141153

0 commit comments

Comments
 (0)