Skip to content

Commit 5f0dab1

Browse files
committed
Add support for tracking and querying whether the phoenic pubsub notifications module has successfully subscribed to the notifications topic.
1 parent a13e9d3 commit 5f0dab1

File tree

2 files changed

+61
-12
lines changed

2 files changed

+61
-12
lines changed

lib/fun_with_flags/notifications/phoenix_pubsub.ex

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,20 @@ defmodule FunWithFlags.Notifications.PhoenixPubSub do
3737
unique_id
3838
end
3939

40+
# Get the pubsub subscription status for the current note, which tells us if
41+
# the GenServer for this module has successfully completed the Phoenix.PubSub
42+
# subscription procedure to the change notification topic.
43+
#
44+
# The GenServer might still be unsubscribed if this is called very early
45+
# after the application has started. (i.e. in some unit tests), but in general
46+
# a runtime exception is raised if subscribing is not completed within a few
47+
# seconds.
48+
#
49+
def subscribed? do
50+
{:ok, subscription_status} = GenServer.call(__MODULE__, :get_subscription_status)
51+
subscription_status == :subscribed
52+
end
53+
4054

4155
def publish_change(flag_name) do
4256
Logger.debug fn -> "FunWithFlags.Notifications: publish change for '#{flag_name}'" end
@@ -55,8 +69,8 @@ defmodule FunWithFlags.Notifications.PhoenixPubSub do
5569
# The unique_id will become the state of the GenServer
5670
#
5771
def init(unique_id) do
58-
subscribe(1)
59-
{:ok, unique_id}
72+
subscription_status = subscribe(1)
73+
{:ok, {unique_id, subscription_status}}
6074
end
6175

6276

@@ -66,18 +80,20 @@ defmodule FunWithFlags.Notifications.PhoenixPubSub do
6680
:ok ->
6781
# All good
6882
Logger.debug fn -> "FunWithFlags: Connected to Phoenix.PubSub process #{inspect(client())}" end
69-
:ok
83+
:subscribed
7084
{:error, reason} ->
7185
# Handled application errors
7286
Logger.debug fn -> "FunWithFlags: Cannot subscribe to Phoenix.PubSub process #{inspect(client())} ({:error, #{inspect(reason)}})." end
7387
try_again_to_subscribe(attempt)
88+
:unsubscribed
7489
end
7590
rescue
7691
e ->
7792
# The pubsub process was probably not running. This happens when using it in Phoenix, as it tries to connect the
7893
# first time while the application is booting, and the Phoenix.PubSub process is not fully started yet.
7994
Logger.debug fn -> "FunWithFlags: Cannot subscribe to Phoenix.PubSub process #{inspect(client())} (exception: #{inspect(e)})." end
8095
try_again_to_subscribe(attempt)
96+
:unsubscribed
8197
end
8298
end
8399

@@ -96,31 +112,47 @@ defmodule FunWithFlags.Notifications.PhoenixPubSub do
96112
end
97113

98114

99-
def handle_call(:get_unique_id, _from, unique_id) do
100-
{:reply, {:ok, unique_id}, unique_id}
115+
def handle_call(:get_unique_id, _from, state = {unique_id, _subscription_status}) do
116+
{:reply, {:ok, unique_id}, state}
117+
end
118+
119+
def handle_call(:get_subscription_status, _from, state = {_unique_id, subscription_status}) do
120+
{:reply, {:ok, subscription_status}, state}
121+
end
122+
123+
# Test helper
124+
#
125+
def handle_call({:test_helper_set_subscription_status, new_subscription_status}, _from, {unique_id, _current_subscription_status}) do
126+
{:reply, :ok, {unique_id, new_subscription_status}}
101127
end
102128

103129

104-
def handle_info({:fwf_changes, {:updated, _name, unique_id}}, unique_id) do
130+
def handle_info({:fwf_changes, {:updated, _name, unique_id}}, state = {unique_id, _subscription_status}) do
105131
# received my own message, doing nothing
106-
{:noreply, unique_id}
132+
{:noreply, state}
107133
end
108134

109-
def handle_info({:fwf_changes, {:updated, name, _}}, unique_id) do
135+
def handle_info({:fwf_changes, {:updated, name, _}}, state) do
110136
# received message from another node, reload the flag
111137
Logger.debug fn -> "FunWithFlags: received change notification for flag '#{name}'" end
112138
Task.start(Store, :reload, [name])
113-
{:noreply, unique_id}
139+
{:noreply, state}
114140
end
115141

116142

117143
# When subscribing to the pubsub process fails, the process sends itself a delayed message
118144
# to try again. It will be handled here.
119145
#
120-
def handle_info({:subscribe_retry, attempt}, unique_id) do
146+
def handle_info({:subscribe_retry, attempt}, state = {unique_id, _subscription_status}) do
121147
Logger.debug fn -> "FunWithFlags: retrying to subscribe to Phoenix.PubSub, attempt #{attempt}." end
122-
subscribe(attempt)
123-
{:noreply, unique_id}
148+
case subscribe(attempt) do
149+
:subscribed ->
150+
Logger.debug fn -> "FunWithFlags: updating Phoenix.PubSub's subscription status to :subscribed." end
151+
{:noreply, {unique_id, :subscribed}}
152+
_ ->
153+
# don't change the state
154+
{:noreply, state}
155+
end
124156
end
125157

126158
defp client, do: Config.pubsub_client()

test/fun_with_flags/notifications/phoenix_pubsub_test.exs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,23 @@ defmodule FunWithFlags.Notifications.PhoenixPubSubTest do
2424
end
2525
end
2626

27+
describe "subscribed?()" do
28+
test "it returns true if the GenServer is subscribed to the pubsub topic" do
29+
assert :ok = GenServer.call(PubSub, {:test_helper_set_subscription_status, :subscribed})
30+
assert true = PubSub.subscribed?()
31+
32+
# Kill the process to restore its normal state.
33+
kill_process(PubSub)
34+
end
35+
36+
test "it returns false if the GenServer is not subscribed to the pubsub topic" do
37+
assert :ok = GenServer.call(PubSub, {:test_helper_set_subscription_status, :unsubscribed})
38+
assert false == PubSub.subscribed?()
39+
40+
# Kill the process to restore its normal state.
41+
kill_process(PubSub)
42+
end
43+
end
2744

2845
describe "publish_change(flag_name)" do
2946
setup do

0 commit comments

Comments
 (0)