Skip to content

Commit c4ba2aa

Browse files
authored
feat: gen_rpc pub sub adapter (#1529)
Add a PubSub adapter that uses gen_rpc to send messages to other nodes. It uses :gen_rpc.abcast/3 instead of :erlang.send/2 The adapter works very similarly to the PG2 adapter. It consists of multiple workers that forward to the local node using PubSub.local_broadcast. The way to choose the worker to be used is based on the sending process just like PG2 adapter does The number of workers is controlled by `:pool_size` or `:broadcast_pool_size`. This distinction exists because Phoenix.PubSub uses `:pool_size` to define how many partitions the PubSub registry will use. It's possible to control them separately by using `:broadcast_pool_size`
1 parent 5ccea17 commit c4ba2aa

File tree

10 files changed

+142
-13
lines changed

10 files changed

+142
-13
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,8 @@ If you're using the default tenant, the URL is `ws://realtime-dev.localhost:4000
190190
| MAX_GEN_RPC_CLIENTS | number | Max amount of `gen_rpc` TCP connections per node-to-node channel |
191191
| REBALANCE_CHECK_INTERVAL_IN_MS | number | Time in ms to check if process is in the right region |
192192
| DISCONNECT_SOCKET_ON_NO_CHANNELS_INTERVAL_IN_MS | number | Time in ms to check if a socket has no channels open and if so, disconnect it |
193+
| BROADCAST_POOL_SIZE | number | Number of processes to relay Phoenix.PubSub messages across the cluster |
194+
193195

194196
The OpenTelemetry variables mentioned above are not an exhaustive list of all [supported environment variables](https://opentelemetry.io/docs/languages/sdk-configuration/).
195197

config/runtime.exs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ janitor_run_after_in_ms = Env.get_integer("JANITOR_RUN_AFTER_IN_MS", :timer.minu
6767
janitor_children_timeout = Env.get_integer("JANITOR_CHILDREN_TIMEOUT", :timer.seconds(5))
6868
janitor_schedule_timer = Env.get_integer("JANITOR_SCHEDULE_TIMER_IN_MS", :timer.hours(4))
6969
platform = if System.get_env("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE", do: :aws, else: :fly
70+
broadcast_pool_size = Env.get_integer("BROADCAST_POOL_SIZE", 10)
7071

7172
no_channel_timeout_in_ms =
7273
if config_env() == :test,
@@ -120,7 +121,8 @@ config :realtime,
120121
rpc_timeout: rpc_timeout,
121122
max_gen_rpc_clients: max_gen_rpc_clients,
122123
no_channel_timeout_in_ms: no_channel_timeout_in_ms,
123-
platform: platform
124+
platform: platform,
125+
broadcast_pool_size: broadcast_pool_size
124126

125127
if config_env() != :test && run_janitor? do
126128
config :realtime,

lib/realtime/application.ex

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ defmodule Realtime.Application do
5252
region = Application.get_env(:realtime, :region)
5353
:syn.join(RegionNodes, region, self(), node: node())
5454

55+
broadcast_pool_size = Application.get_env(:realtime, :broadcast_pool_size, 10)
5556
migration_partition_slots = Application.get_env(:realtime, :migration_partition_slots)
5657
connect_partition_slots = Application.get_env(:realtime, :connect_partition_slots)
5758
no_channel_timeout_in_ms = Application.get_env(:realtime, :no_channel_timeout_in_ms)
@@ -65,7 +66,8 @@ defmodule Realtime.Application do
6566
Realtime.Repo,
6667
RealtimeWeb.Telemetry,
6768
{Cluster.Supervisor, [topologies, [name: Realtime.ClusterSupervisor]]},
68-
{Phoenix.PubSub, name: Realtime.PubSub, pool_size: 10},
69+
{Phoenix.PubSub,
70+
name: Realtime.PubSub, pool_size: 10, adapter: Realtime.GenRpcPubSub, broadcast_pool_size: broadcast_pool_size},
6971
{Cachex, name: Realtime.RateCounter},
7072
Realtime.Tenants.Cache,
7173
Realtime.RateCounter.DynamicSupervisor,

lib/realtime/gen_rpc.ex

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,22 @@ defmodule Realtime.GenRpc do
1010

1111
@type result :: any | {:error, :rpc_error, reason :: any}
1212

13+
@doc """
14+
Broadcasts the message `msg` asynchronously to the registered process `name` on the specified `nodes`.
15+
16+
Options:
17+
18+
- `:key` - Optional key to consistently select the same gen_rpc clients to guarantee message order between nodes
19+
"""
20+
@spec abcast([node], atom, any, keyword()) :: :ok
21+
def abcast(nodes, name, msg, opts) when is_list(nodes) and is_atom(name) and is_list(opts) do
22+
key = Keyword.get(opts, :key, nil)
23+
nodes = rpc_nodes(nodes, key)
24+
25+
:gen_rpc.abcast(nodes, name, msg)
26+
:ok
27+
end
28+
1329
@doc """
1430
Fire and forget apply(mod, func, args) on all nodes
1531

lib/realtime/gen_rpc/pub_sub.ex

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
defmodule Realtime.GenRpcPubSub do
2+
@moduledoc """
3+
gen_rpc Phoenix.PubSub adapter
4+
"""
5+
6+
@behaviour Phoenix.PubSub.Adapter
7+
alias Realtime.GenRpc
8+
use Supervisor
9+
10+
@impl true
11+
def node_name(_), do: node()
12+
13+
# Supervisor callbacks
14+
15+
def start_link(opts) do
16+
adapter_name = Keyword.fetch!(opts, :adapter_name)
17+
name = Keyword.fetch!(opts, :name)
18+
pool_size = Keyword.get(opts, :pool_size, 1)
19+
broadcast_pool_size = Keyword.get(opts, :broadcast_pool_size, pool_size)
20+
21+
Supervisor.start_link(__MODULE__, {adapter_name, name, broadcast_pool_size},
22+
name: :"#{name}#{adapter_name}_supervisor"
23+
)
24+
end
25+
26+
@impl true
27+
def init({adapter_name, pubsub, pool_size}) do
28+
workers = for number <- 1..pool_size, do: :"#{pubsub}#{adapter_name}_#{number}"
29+
30+
:persistent_term.put(adapter_name, List.to_tuple(workers))
31+
32+
children =
33+
for worker <- workers do
34+
Supervisor.child_spec({Realtime.GenRpcPubSub.Worker, {pubsub, worker}}, id: worker)
35+
end
36+
37+
Supervisor.init(children, strategy: :one_for_one)
38+
end
39+
40+
defp worker_name(adapter_name, key) do
41+
workers = :persistent_term.get(adapter_name)
42+
elem(workers, :erlang.phash2(key, tuple_size(workers)))
43+
end
44+
45+
@impl true
46+
def broadcast(adapter_name, topic, message, dispatcher) do
47+
worker = worker_name(adapter_name, self())
48+
GenRpc.abcast(Node.list(), worker, forward_to_local(topic, message, dispatcher), key: worker)
49+
end
50+
51+
@impl true
52+
def direct_broadcast(adapter_name, node_name, topic, message, dispatcher) do
53+
worker = worker_name(adapter_name, self())
54+
GenRpc.abcast([node_name], worker, forward_to_local(topic, message, dispatcher), key: worker)
55+
end
56+
57+
defp forward_to_local(topic, message, dispatcher), do: {:ftl, topic, message, dispatcher}
58+
end
59+
60+
defmodule Realtime.GenRpcPubSub.Worker do
61+
@moduledoc false
62+
use GenServer
63+
64+
@doc false
65+
def start_link({pubsub, worker}), do: GenServer.start_link(__MODULE__, pubsub, name: worker)
66+
67+
@impl true
68+
def init(pubsub), do: {:ok, pubsub}
69+
70+
@impl true
71+
def handle_info({:ftl, topic, message, dispatcher}, pubsub) do
72+
Phoenix.PubSub.local_broadcast(pubsub, topic, message, dispatcher)
73+
{:noreply, pubsub}
74+
end
75+
76+
@impl true
77+
def handle_info(_, pubsub), do: {:noreply, pubsub}
78+
end

lib/realtime_web/tenant_broadcaster.ex

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ defmodule RealtimeWeb.TenantBroadcaster do
99
def pubsub_broadcast(tenant_id, topic, message, dispatcher) do
1010
collect_payload_size(tenant_id, message)
1111

12-
Realtime.GenRpc.multicast(PubSub, :local_broadcast, [Realtime.PubSub, topic, message, dispatcher], key: topic)
12+
PubSub.broadcast(Realtime.PubSub, topic, message, dispatcher)
1313

1414
:ok
1515
end
@@ -25,13 +25,7 @@ defmodule RealtimeWeb.TenantBroadcaster do
2525
def pubsub_broadcast_from(tenant_id, from, topic, message, dispatcher) do
2626
collect_payload_size(tenant_id, message)
2727

28-
Realtime.GenRpc.multicast(
29-
PubSub,
30-
:local_broadcast_from,
31-
[Realtime.PubSub, from, topic, message, dispatcher],
32-
key: topic
33-
)
34-
28+
PubSub.broadcast_from(Realtime.PubSub, from, topic, message, dispatcher)
3529
:ok
3630
end
3731

mix.exs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.49.0",
7+
version: "2.50.0",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,
@@ -90,7 +90,7 @@ defmodule Realtime.MixProject do
9090
{:opentelemetry_phoenix, "~> 2.0"},
9191
{:opentelemetry_cowboy, "~> 1.0"},
9292
{:opentelemetry_ecto, "~> 1.2"},
93-
{:gen_rpc, git: "https://github.com/supabase/gen_rpc.git", ref: "d161cf263c661a534eaabf80aac7a34484dac772"},
93+
{:gen_rpc, git: "https://github.com/supabase/gen_rpc.git", ref: "5aea098b300a0a6ad13533e030230132cbe9ca2c"},
9494
{:mimic, "~> 1.0", only: :test},
9595
{:floki, ">= 0.30.0", only: :test},
9696
{:mint_web_socket, "~> 1.0", only: :test},

mix.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
"file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"},
3030
"finch": {:hex, :finch, "0.19.0", "c644641491ea854fc5c1bbaef36bfc764e3f08e7185e1f084e35e0672241b76d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "fc5324ce209125d1e2fa0fcd2634601c52a787aff1cd33ee833664a5af4ea2b6"},
3131
"floki": {:hex, :floki, "0.37.0", "b83e0280bbc6372f2a403b2848013650b16640cd2470aea6701f0632223d719e", [:mix], [], "hexpm", "516a0c15a69f78c47dc8e0b9b3724b29608aa6619379f91b1ffa47109b5d0dd3"},
32-
"gen_rpc": {:git, "https://github.com/supabase/gen_rpc.git", "d161cf263c661a534eaabf80aac7a34484dac772", [ref: "d161cf263c661a534eaabf80aac7a34484dac772"]},
32+
"gen_rpc": {:git, "https://github.com/supabase/gen_rpc.git", "5aea098b300a0a6ad13533e030230132cbe9ca2c", [ref: "5aea098b300a0a6ad13533e030230132cbe9ca2c"]},
3333
"gettext": {:hex, :gettext, "0.26.2", "5978aa7b21fada6deabf1f6341ddba50bc69c999e812211903b169799208f2a8", [:mix], [{:expo, "~> 0.5.1 or ~> 1.0", [hex: :expo, repo: "hexpm", optional: false]}], "hexpm", "aa978504bcf76511efdc22d580ba08e2279caab1066b76bb9aa81c4a1e0a32a5"},
3434
"gproc": {:hex, :gproc, "0.9.1", "f1df0364423539cf0b80e8201c8b1839e229e5f9b3ccb944c5834626998f5b8c", [:rebar3], [], "hexpm", "905088e32e72127ed9466f0bac0d8e65704ca5e73ee5a62cb073c3117916d507"},
3535
"grpcbox": {:hex, :grpcbox, "0.17.1", "6e040ab3ef16fe699ffb513b0ef8e2e896da7b18931a1ef817143037c454bcce", [:rebar3], [{:acceptor_pool, "~> 1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~> 0.15.1", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~> 0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~> 0.9.1", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "4a3b5d7111daabc569dc9cbd9b202a3237d81c80bf97212fbc676832cb0ceb17"},
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Application.put_env(:phoenix_pubsub, :test_adapter, {Realtime.GenRpcPubSub, []})
2+
Code.require_file("../../deps/phoenix_pubsub/test/shared/pubsub_test.exs", __DIR__)

test/realtime/gen_rpc_test.exs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,39 @@ defmodule Realtime.GenRpcTest do
186186
end
187187
end
188188

189+
describe "abcast/4" do
190+
test "abcast to registered process", %{node: node} do
191+
name =
192+
System.unique_integer()
193+
|> to_string()
194+
|> String.to_atom()
195+
196+
:erlang.register(name, self())
197+
198+
# Use erpc to make the other node abcast to this one
199+
:erpc.call(node, GenRpc, :abcast, [[node()], name, "a message", []])
200+
201+
assert_receive "a message"
202+
refute_receive _any
203+
end
204+
205+
@tag extra_config: [{:gen_rpc, :tcp_server_port, 9999}]
206+
test "tcp error" do
207+
Logger.put_process_level(self(), :debug)
208+
209+
log =
210+
capture_log(fn ->
211+
assert GenRpc.abcast(Node.list(), :some_process_name, "a message", []) == :ok
212+
# We have to wait for gen_rpc logs to show up
213+
Process.sleep(100)
214+
end)
215+
216+
assert log =~ "[error] event=connect_to_remote_server"
217+
218+
refute_receive _any
219+
end
220+
end
221+
189222
describe "multicast/4" do
190223
test "evals everywhere" do
191224
parent = self()

0 commit comments

Comments
 (0)