Skip to content

Commit 95b4ad2

Browse files
authored
Introduce broadcast_pool_size option to allow safe pool size migration (#197)
1 parent 420a87a commit 95b4ad2

File tree

8 files changed

+201
-21
lines changed

8 files changed

+201
-21
lines changed

lib/phoenix/pubsub.ex

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ defmodule Phoenix.PubSub do
3434
It supports a `:pool_size` option to be given alongside
3535
the name, defaults to `1`. Note the `:pool_size` must
3636
be the same throughout the cluster, therefore don't
37-
configure the pool size based on `System.schedulers_online/1`,
37+
configure the pool size based on `System.schedulers_online/0`,
3838
especially if you are using machines with different specs.
3939
4040
* `Phoenix.PubSub.Redis` - uses Redis to exchange data between
@@ -59,6 +59,83 @@ defmodule Phoenix.PubSub do
5959
custom `value` to provide "fastlaning", allowing messages broadcast
6060
to thousands or even millions of users to be encoded once and written
6161
directly to sockets instead of being encoded per channel.
62+
63+
## Safe pool size migration (when using `Phoenix.PubSub.PG2` adapter)
64+
65+
When you need to change the pool size in a running cluster,
66+
you can use the `broadcast_pool_size` option to ensure no
67+
messages are lost during deployment. This is particularly
68+
important when increasing the pool size.
69+
70+
Here's how to safely increase the pool size from 1 to 2:
71+
72+
1. Initial state - Current configuration with `pool_size: 1`:
73+
```
74+
{Phoenix.PubSub, name: :my_pubsub, pool_size: 1}
75+
```
76+
77+
```mermaid
78+
graph TD
79+
subgraph "Initial State"
80+
subgraph "Node 1"
81+
A1[Shard 1<br/>Broadcast & Receive]
82+
end
83+
subgraph "Node 2"
84+
B1[Shard 1<br/>Broadcast & Receive]
85+
end
86+
A1 <--> B1
87+
end
88+
```
89+
90+
2. First deployment - Set the new pool size but keep broadcasting on the old size:
91+
```
92+
{Phoenix.PubSub, name: :my_pubsub, pool_size: 2, broadcast_pool_size: 1}
93+
```
94+
95+
```mermaid
96+
graph TD
97+
subgraph "First Deployment"
98+
subgraph "Node 1"
99+
A1[Shard 1<br/>Broadcast & Receive]
100+
A2[Shard 2<br/>Broadcast & Receive]
101+
end
102+
subgraph "Node 2"
103+
B1[Shard 1<br/>Broadcast & Receive]
104+
B2[Shard 2<br/>Receive Only]
105+
end
106+
A1 <--> B1
107+
A2 --> B2
108+
end
109+
```
110+
111+
3. Final deployment - All nodes running with new pool size:
112+
```
113+
{Phoenix.PubSub, name: :my_pubsub, pool_size: 2}
114+
```
115+
116+
```mermaid
117+
graph TD
118+
subgraph "Final State"
119+
subgraph "Node 1"
120+
A1[Shard 1<br/>Broadcast & Receive]
121+
A2[Shard 2<br/>Broadcast & Receive]
122+
end
123+
subgraph "Node 2"
124+
B1[Shard 1<br/>Broadcast & Receive]
125+
B2[Shard 2<br/>Broadcast & Receive]
126+
end
127+
A1 <--> B1
128+
A2 <--> B2
129+
end
130+
```
131+
132+
This two-step process ensures that:
133+
- All nodes can receive messages from both old and new pool sizes
134+
- No messages are lost during the transition
135+
- The cluster remains fully functional throughout the deployment
136+
137+
To decrease the pool size, follow the same process in reverse order.
138+
62139
"""
63140

64141
@type node_name :: atom | binary
@@ -87,6 +164,9 @@ defmodule Phoenix.PubSub do
87164
* `:adapter` - the adapter to use (defaults to `Phoenix.PubSub.PG2`)
88165
* `:pool_size` - number of pubsub partitions to launch
89166
(defaults to one partition for every 4 cores)
167+
* `:broadcast_pool_size` - number of pubsub partitions used for broadcasting messages
168+
(defaults to `:pool_size`). This option is used during pool size migrations to ensure
169+
no messages are lost. See the "Safe Pool Size Migration" section in the module documentation.
90170
91171
"""
92172
@spec child_spec(keyword) :: Supervisor.child_spec()

lib/phoenix/pubsub/pg2.ex

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,30 +61,42 @@ defmodule Phoenix.PubSub.PG2 do
6161
def start_link(opts) do
6262
name = Keyword.fetch!(opts, :name)
6363
pool_size = Keyword.get(opts, :pool_size, 1)
64-
adapter_name = Keyword.fetch!(opts, :adapter_name)
65-
Supervisor.start_link(__MODULE__, {name, adapter_name, pool_size}, name: :"#{adapter_name}_supervisor")
64+
broadcast_pool_size = Keyword.get(opts, :broadcast_pool_size, pool_size)
65+
66+
if pool_size < broadcast_pool_size do
67+
{:error, "the :pool_size option must be greater than or equal to the :broadcast_pool_size option"}
68+
else
69+
adapter_name = Keyword.fetch!(opts, :adapter_name)
70+
Supervisor.start_link(__MODULE__, {name, adapter_name, pool_size, broadcast_pool_size}, name: :"#{adapter_name}_supervisor")
71+
end
6672
end
6773

6874
@impl true
69-
def init({name, adapter_name, pool_size}) do
70-
[_ | groups] =
71-
for number <- 1..pool_size do
72-
:"#{adapter_name}_#{number}"
73-
end
75+
def init({name, adapter_name, pool_size, broadcast_pool_size}) do
7476

75-
# Use `adapter_name` for the first in the pool for backwards compatibility
76-
# with v2.0 when the pool_size is 1.
77-
groups = [adapter_name | groups]
77+
listener_groups = groups(adapter_name, pool_size)
78+
broadcast_groups = groups(adapter_name, broadcast_pool_size)
7879

79-
:persistent_term.put(adapter_name, List.to_tuple(groups))
80+
:persistent_term.put(adapter_name, List.to_tuple(broadcast_groups))
8081

8182
children =
82-
for group <- groups do
83+
for group <- listener_groups do
8384
Supervisor.child_spec({Phoenix.PubSub.PG2Worker, {name, group}}, id: group)
8485
end
8586

8687
Supervisor.init(children, strategy: :one_for_one)
8788
end
89+
90+
defp groups(adapter_name, pool_size) do
91+
[_ | groups] =
92+
for number <- 1..pool_size do
93+
:"#{adapter_name}_#{number}"
94+
end
95+
96+
# Use `adapter_name` for the first in the pool for backwards compatibility
97+
# with v2.0 when the pool_size is 1.
98+
[adapter_name | groups]
99+
end
88100
end
89101

90102
defmodule Phoenix.PubSub.PG2Worker do

mix.exs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,42 @@ defmodule Phoenix.PubSub.Mixfile do
4747
[
4848
main: "Phoenix.PubSub",
4949
source_ref: "v#{@version}",
50-
source_url: "https://github.com/phoenixframework/phoenix_pubsub"
50+
source_url: "https://github.com/phoenixframework/phoenix_pubsub",
51+
52+
before_closing_body_tag: %{
53+
html: """
54+
<script defer src="https://cdn.jsdelivr.net/npm/mermaid@11.6.0/dist/mermaid.min.js"></script>
55+
<script>
56+
let initialized = false;
57+
58+
window.addEventListener("exdoc:loaded", () => {
59+
if (!initialized) {
60+
mermaid.initialize({
61+
startOnLoad: false,
62+
theme: document.body.className.includes("dark") ? "dark" : "default"
63+
});
64+
initialized = true;
65+
}
66+
67+
let id = 0;
68+
for (const codeEl of document.querySelectorAll("pre code.mermaid")) {
69+
const preEl = codeEl.parentElement;
70+
const graphDefinition = codeEl.textContent;
71+
const graphEl = document.createElement("div");
72+
const graphId = "mermaid-graph-" + id++;
73+
74+
mermaid.render(graphId, graphDefinition).then(({svg, bindFunctions}) => {
75+
graphEl.innerHTML = svg;
76+
bindFunctions?.(graphEl);
77+
preEl.insertAdjacentElement("afterend", graphEl);
78+
preEl.remove();
79+
});
80+
}
81+
});
82+
</script>
83+
""",
84+
epub: ""
85+
}
5186
]
5287
end
5388
end

test/phoenix/distributed_pubsub_test.exs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ defmodule Phoenix.PubSub.DistributedTest do
55

66
@node1 :"node1@127.0.0.1"
77
@node2 :"node2@127.0.0.1"
8+
@node3 :"node3@127.0.0.1"
9+
@node4 :"node4@127.0.0.1"
810

911
setup config do
1012
{:ok, %{pubsub: Phoenix.PubSubTest, topic: Atom.to_string(config.test)}}
@@ -17,7 +19,7 @@ defmodule Phoenix.PubSub.DistributedTest do
1719
:ok = PubSub.broadcast(config.pubsub, config.topic, :ping)
1820
assert_receive {@node1, :ping}
1921
assert_receive {@node2, :ping}
20-
22+
2123
:ok = PubSub.broadcast(config.pubsub, config.topic, :ping)
2224
assert_receive {@node1, :ping}
2325
assert_receive {@node2, :ping}
@@ -46,4 +48,27 @@ defmodule Phoenix.PubSub.DistributedTest do
4648
:ok = PubSub.direct_broadcast!(@node2, config.pubsub, config.topic, :ping)
4749
refute_received {@node1, :ping}
4850
end
51+
52+
test "broadcast is received by other node that has pool_size > broadcast_pool_size", config do
53+
spy_on_pubsub(@node1, config.pubsub, self(), config.topic)
54+
# node3 has pool_size = 4, broadcast_pool_size = 1
55+
spy_on_pubsub(@node3, config.pubsub, self(), config.topic)
56+
:ok = PubSub.broadcast(config.pubsub, config.topic, :ping)
57+
assert_receive {@node3, :ping}
58+
end
59+
60+
test "broadcast is received by other node that was broadcast from node that has broadcast_pool_size < pool_size", config do
61+
# node4 has pool_size = 1, message is sent from node3 that has pool_size = 4, broadcast_pool_size = 1
62+
spy_on_pubsub(@node1, config.pubsub, self(), config.topic)
63+
spy_on_pubsub(@node2, config.pubsub, self(), config.topic)
64+
spy_on_pubsub(@node3, config.pubsub, self(), config.topic)
65+
spy_on_pubsub(@node4, config.pubsub, self(), config.topic)
66+
67+
broadcast_from_node(@node3, config.pubsub, config.topic, :ping)
68+
69+
assert_receive {@node1, :ping}
70+
assert_receive {@node2, :ping}
71+
assert_receive {@node3, :ping}
72+
assert_receive {@node4, :ping}
73+
end
4974
end

test/phoenix/pubsub_test.exs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,18 @@ defmodule Phoenix.PubSub.UnitTest do
88
assert Exception.message(exception) ==
99
"the :name option is required when starting Phoenix.PubSub"
1010
end
11+
12+
test "pool_size can't be smaller than broadcast_pool_size" do
13+
opts = [name: name(), pool_size: 1, broadcast_pool_size: 2]
14+
15+
{:error, {{:shutdown, {:failed_to_start_child, Phoenix.PubSub.PG2, message}}, _}} =
16+
start_supervised({Phoenix.PubSub, opts})
17+
18+
assert ^message = "the :pool_size option must be greater than or equal to the :broadcast_pool_size option"
19+
end
20+
21+
defp name do
22+
:"#{__MODULE__}_#{:crypto.strong_rand_bytes(8) |> Base.encode16()}"
23+
end
1124
end
1225
end

test/support/cluster.ex

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@ defmodule Phoenix.PubSub.Cluster do
1818
|> Enum.map(&Task.await(&1, 30_000))
1919
end
2020

21-
defp spawn_node(node_host) do
21+
defp spawn_node({node_host, opts}) do
2222
{:ok, node} = :slave.start(to_charlist("127.0.0.1"), node_name(node_host), inet_loader_args())
2323
add_code_paths(node)
2424
transfer_configuration(node)
2525
ensure_applications_started(node)
26-
start_pubsub(node)
26+
start_pubsub(node, opts)
2727
{:ok, node}
2828
end
29+
defp spawn_node(node_host) do
30+
spawn_node({node_host, []})
31+
end
2932

3033
defp rpc(node, module, function, args) do
3134
:rpc.block_call(node, module, function, args)
@@ -60,9 +63,10 @@ defmodule Phoenix.PubSub.Cluster do
6063
end
6164
end
6265

63-
defp start_pubsub(node) do
66+
defp start_pubsub(node, opts) do
67+
opts = [name: Phoenix.PubSubTest, pool_size: 4] |> Keyword.merge(opts)
6468
args = [
65-
[{Phoenix.PubSub, name: Phoenix.PubSubTest, pool_size: 1}],
69+
[{Phoenix.PubSub, opts}],
6670
[strategy: :one_for_one]
6771
]
6872

test/support/node_case.ex

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,4 +187,10 @@ defmodule Phoenix.PubSub.NodeCase do
187187
@timeout -> {pid, {:error, :timeout}}
188188
end
189189
end
190+
191+
def broadcast_from_node(node, pubsub, topic, message) do
192+
call_node(node, fn ->
193+
Phoenix.PubSub.broadcast(pubsub, topic, message)
194+
end)
195+
end
190196
end

test/test_helper.exs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,17 @@ Application.put_env(:phoenix_pubsub, :test_adapter, {Phoenix.PubSub.PG2, []})
33
exclude = Keyword.get(ExUnit.configuration(), :exclude, [])
44

55
Supervisor.start_link(
6-
[{Phoenix.PubSub, name: Phoenix.PubSubTest, pool_size: 1}],
6+
[{Phoenix.PubSub, name: Phoenix.PubSubTest, pool_size: 4}],
77
strategy: :one_for_one
88
)
99

1010
unless :clustered in exclude do
11-
Phoenix.PubSub.Cluster.spawn([:"node1@127.0.0.1", :"node2@127.0.0.1"])
11+
Phoenix.PubSub.Cluster.spawn([
12+
:"node1@127.0.0.1",
13+
:"node2@127.0.0.1",
14+
{:"node3@127.0.0.1", pool_size: 4, broadcast_pool_size: 1},
15+
{:"node4@127.0.0.1", pool_size: 1}
16+
])
1217
end
1318

1419
ExUnit.start()

0 commit comments

Comments
 (0)