Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* Add support for Elixir 1.18. Drop support for Elixir 1.15. Elixir >= 1.16 is now required. Dropping support for older versions of Elixir simply means that this package is no longer tested with them in CI, and that compatibility issues are not considered bugs.
* Drop support for Erlang/OTP 24, and Erlang/OTP >= 25 is now required. Dropping support for older versions of Erlang/OTP simply means that this package is not tested with them in CI, and that no compatibility issues are considered bugs.
* Improve how the change-notification Phoenix.PubSub adapter manages its connection and readiness status. ([pull/191](https://github.com/tompave/fun_with_flags/pull/191))

## v1.12.0

Expand Down
56 changes: 44 additions & 12 deletions lib/fun_with_flags/notifications/phoenix_pubsub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@ defmodule FunWithFlags.Notifications.PhoenixPubSub do
unique_id
end

# Get the pubsub subscription status for the current note, which tells us if
# the GenServer for this module has successfully completed the Phoenix.PubSub
# subscription procedure to the change notification topic.
#
# The GenServer might still be unsubscribed if this is called very early
# after the application has started. (i.e. in some unit tests), but in general
# a runtime exception is raised if subscribing is not completed within a few
# seconds.
#
def subscribed? do
{:ok, subscription_status} = GenServer.call(__MODULE__, :get_subscription_status)
subscription_status == :subscribed
end


def publish_change(flag_name) do
Logger.debug fn -> "FunWithFlags.Notifications: publish change for '#{flag_name}'" end
Expand All @@ -55,8 +69,8 @@ defmodule FunWithFlags.Notifications.PhoenixPubSub do
# The unique_id will become the state of the GenServer
#
def init(unique_id) do
subscribe(1)
{:ok, unique_id}
subscription_status = subscribe(1)
{:ok, {unique_id, subscription_status}}
end


Expand All @@ -66,18 +80,20 @@ defmodule FunWithFlags.Notifications.PhoenixPubSub do
:ok ->
# All good
Logger.debug fn -> "FunWithFlags: Connected to Phoenix.PubSub process #{inspect(client())}" end
:ok
:subscribed
{:error, reason} ->
# Handled application errors
Logger.debug fn -> "FunWithFlags: Cannot subscribe to Phoenix.PubSub process #{inspect(client())} ({:error, #{inspect(reason)}})." end
try_again_to_subscribe(attempt)
:unsubscribed
end
rescue
e ->
# The pubsub process was probably not running. This happens when using it in Phoenix, as it tries to connect the
# first time while the application is booting, and the Phoenix.PubSub process is not fully started yet.
Logger.debug fn -> "FunWithFlags: Cannot subscribe to Phoenix.PubSub process #{inspect(client())} (exception: #{inspect(e)})." end
try_again_to_subscribe(attempt)
:unsubscribed
end
end

Expand All @@ -96,31 +112,47 @@ defmodule FunWithFlags.Notifications.PhoenixPubSub do
end


def handle_call(:get_unique_id, _from, unique_id) do
{:reply, {:ok, unique_id}, unique_id}
def handle_call(:get_unique_id, _from, state = {unique_id, _subscription_status}) do
{:reply, {:ok, unique_id}, state}
end

def handle_call(:get_subscription_status, _from, state = {_unique_id, subscription_status}) do
{:reply, {:ok, subscription_status}, state}
end

# Test helper
#
def handle_call({:test_helper_set_subscription_status, new_subscription_status}, _from, {unique_id, _current_subscription_status}) do
{:reply, :ok, {unique_id, new_subscription_status}}
end


def handle_info({:fwf_changes, {:updated, _name, unique_id}}, unique_id) do
def handle_info({:fwf_changes, {:updated, _name, unique_id}}, state = {unique_id, _subscription_status}) do
# received my own message, doing nothing
{:noreply, unique_id}
{:noreply, state}
end

def handle_info({:fwf_changes, {:updated, name, _}}, unique_id) do
def handle_info({:fwf_changes, {:updated, name, _}}, state) do
# received message from another node, reload the flag
Logger.debug fn -> "FunWithFlags: received change notification for flag '#{name}'" end
Task.start(Store, :reload, [name])
{:noreply, unique_id}
{:noreply, state}
end


# When subscribing to the pubsub process fails, the process sends itself a delayed message
# to try again. It will be handled here.
#
def handle_info({:subscribe_retry, attempt}, unique_id) do
def handle_info({:subscribe_retry, attempt}, state = {unique_id, _subscription_status}) do
Logger.debug fn -> "FunWithFlags: retrying to subscribe to Phoenix.PubSub, attempt #{attempt}." end
subscribe(attempt)
{:noreply, unique_id}
case subscribe(attempt) do
:subscribed ->
Logger.debug fn -> "FunWithFlags: updating Phoenix.PubSub's subscription status to :subscribed." end
{:noreply, {unique_id, :subscribed}}
_ ->
# don't change the state
{:noreply, state}
end
end

defp client, do: Config.pubsub_client()
Expand Down
83 changes: 61 additions & 22 deletions test/fun_with_flags/notifications/phoenix_pubsub_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,30 @@ defmodule FunWithFlags.Notifications.PhoenixPubSubTest do
end
end

describe "subscribed?()" do
test "it returns true if the GenServer is subscribed to the pubsub topic" do
assert :ok = GenServer.call(PubSub, {:test_helper_set_subscription_status, :subscribed})
assert true = PubSub.subscribed?()

# Kill the process to restore its normal state.
kill_process(PubSub)
wait_until_pubsub_is_ready!()
end

test "it returns false if the GenServer is not subscribed to the pubsub topic" do
assert :ok = GenServer.call(PubSub, {:test_helper_set_subscription_status, :unsubscribed})
assert false == PubSub.subscribed?()

# Kill the process to restore its normal state.
kill_process(PubSub)
wait_until_pubsub_is_ready!()
end
end

describe "publish_change(flag_name)" do
setup do
wait_until_pubsub_is_ready!()

{:ok, name: unique_atom()}
end

Expand All @@ -42,15 +63,16 @@ defmodule FunWithFlags.Notifications.PhoenixPubSubTest do
{Phoenix.PubSub, [:passthrough], []}
]) do
assert {:ok, _pid} = PubSub.publish_change(name)
:timer.sleep(10)

assert called(
Phoenix.PubSub.broadcast!(
:fwf_test,
"fun_with_flags_changes",
{:fwf_changes, {:updated, name, u_id}}
assert_with_retries(fn ->
assert called(
Phoenix.PubSub.broadcast!(
:fwf_test,
"fun_with_flags_changes",
{:fwf_changes, {:updated, name, u_id}}
)
)
)
end)
end
end

Expand Down Expand Up @@ -83,14 +105,16 @@ defmodule FunWithFlags.Notifications.PhoenixPubSubTest do
channel = "fun_with_flags_changes"
message = {:fwf_changes, {:updated, :foobar, u_id}}

wait_until_pubsub_is_ready!()

with_mock(PubSub, [:passthrough], []) do
Phoenix.PubSub.broadcast!(client, channel, message)

:timer.sleep(1)

assert called(
PubSub.handle_info(message, u_id)
)
assert_with_retries(fn ->
assert called(
PubSub.handle_info(message, {u_id, :subscribed})
)
end)
end
end

Expand All @@ -105,10 +129,14 @@ defmodule FunWithFlags.Notifications.PhoenixPubSubTest do
channel = "fun_with_flags_changes"
message = {:fwf_changes, {:updated, :a_flag_name, u_id}}

wait_until_pubsub_is_ready!()

with_mock(Store, [:passthrough], []) do
Phoenix.PubSub.broadcast!(client, channel, message)
:timer.sleep(30)
refute called(Store.reload(:a_flag_name))

assert_with_retries(fn ->
refute called(Store.reload(:a_flag_name))
end)
end
end

Expand All @@ -121,10 +149,14 @@ defmodule FunWithFlags.Notifications.PhoenixPubSubTest do
channel = "fun_with_flags_changes"
message = {:fwf_changes, {:updated, :a_flag_name, another_u_id}}

wait_until_pubsub_is_ready!()

with_mock(Store, [:passthrough], []) do
Phoenix.PubSub.broadcast!(client, channel, message)
:timer.sleep(30)
assert called(Store.reload(:a_flag_name))

assert_with_retries(fn ->
assert called(Store.reload(:a_flag_name))
end)
end
end
end
Expand All @@ -143,12 +175,15 @@ defmodule FunWithFlags.Notifications.PhoenixPubSubTest do
cached_flag = %Flag{name: name, gates: [gate2]}

{:ok, ^stored_flag} = Config.persistence_adapter.put(name, gate)
:timer.sleep(10)
{:ok, ^cached_flag} = Cache.put(cached_flag)
assert_with_retries(fn ->
{:ok, ^cached_flag} = Cache.put(cached_flag)
end)

assert {:ok, ^stored_flag} = Config.persistence_adapter.get(name)
assert {:ok, ^cached_flag} = Cache.get(name)

wait_until_pubsub_is_ready!()

{:ok, name: name, stored_flag: stored_flag, cached_flag: cached_flag}
end

Expand All @@ -166,8 +201,10 @@ defmodule FunWithFlags.Notifications.PhoenixPubSubTest do
message = {:fwf_changes, {:updated, name, u_id}}

Phoenix.PubSub.broadcast!(client, channel, message)
:timer.sleep(30)
assert {:ok, ^cached_flag} = Cache.get(name)

assert_with_retries(fn ->
assert {:ok, ^cached_flag} = Cache.get(name)
end)
end


Expand All @@ -181,8 +218,10 @@ defmodule FunWithFlags.Notifications.PhoenixPubSubTest do

assert {:ok, ^cached_flag} = Cache.get(name)
Phoenix.PubSub.broadcast!(client, channel, message)
:timer.sleep(30)
assert {:ok, ^stored_flag} = Cache.get(name)

assert_with_retries(fn ->
assert {:ok, ^stored_flag} = Cache.get(name)
end)
end
end
end
50 changes: 50 additions & 0 deletions test/support/test_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,54 @@ defmodule FunWithFlags.TestUtils do
def on_elixir_15? do
Version.match?(System.version, ">= 1.15.0")
end

def phx_pubsub_ready? do
try do
Process.whereis(FunWithFlags.Notifications.PhoenixPubSub) &&
FunWithFlags.Notifications.PhoenixPubSub.subscribed?
catch
:exit, _reason ->
# This is to catch failures when the GenServer is still recovering from `Process.exit(:kill)`,
# as in that case this function might fail with:
# (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
#
# I'm not entirely sure about the sequencing here. I'd suppose that `Process.whereis()` should
# protect us from that, but likely there is a race condition somewhere so that the GenServer is
# exited/killed after the `whereis()` call has returned a truthy value.

# IO.puts "EXIT while checking for Phoenix Pubsub readiness: #{inspect reason}"
false
end
end

def wait_until_pubsub_is_ready!(attempts \\ 20, wait_time_ms \\ 25)

def wait_until_pubsub_is_ready!(attempts, wait_time_ms) when attempts > 0 do
case phx_pubsub_ready?() do
true ->
:ok
_ ->
:timer.sleep(wait_time_ms)
wait_until_pubsub_is_ready!(attempts - 1, wait_time_ms)
end
end

def wait_until_pubsub_is_ready!(_, _) do
raise "Phoenix PubSub is never ready, giving up"
end

def assert_with_retries(attempts \\ 30, wait_time_ms \\ 25, test_fn) do
try do
test_fn.()
rescue
e ->
if attempts == 1 do
reraise e, __STACKTRACE__
else
IO.write("|")
:timer.sleep(wait_time_ms)
assert_with_retries(attempts - 1, wait_time_ms, test_fn)
end
end
end
end
Loading