Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 161 additions & 103 deletions lib/phoenix/sync/electric.ex
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,10 @@ defmodule Phoenix.Sync.Electric do
@doc false
@impl Phoenix.Sync.Adapter
def children(env, opts) do
{mode, electric_opts} = electric_opts(opts)
{mode, electric_opts} =
opts
|> set_environment_defaults(env)
|> electric_opts(env)

case mode do
:disabled ->
Expand All @@ -158,7 +161,11 @@ defmodule Phoenix.Sync.Electric do
@doc false
@impl Phoenix.Sync.Adapter
def plug_opts(env, opts) do
{mode, electric_opts} = electric_opts(opts)
{mode, electric_opts} =
opts
|> set_environment_defaults(env)
|> electric_opts(env)

# don't need to validate the mode here -- it will have already been
# validated by children/0 which is run at phoenix_sync startup before the
# plug opts call even comes through
Expand All @@ -179,7 +186,10 @@ defmodule Phoenix.Sync.Electric do
@doc false
@impl Phoenix.Sync.Adapter
def client(env, opts) do
{mode, electric_opts} = electric_opts(opts)
{mode, electric_opts} =
opts
|> set_environment_defaults(env)
|> electric_opts(env)

case mode do
mode when mode in @client_valid_modes ->
Expand All @@ -192,40 +202,74 @@ defmodule Phoenix.Sync.Electric do
end
end

# if we want to set up per-run configuration, and avoid weird state errors in
# dev and test, then we have to write them to the application config, because
# `children/2`, `client/2` and `plug_opts/2` need to have consistent
# configuration values.
defp set_environment_defaults(opts, :test) do
opts
|> set_persistent_config(:stack_id, fn ->
"electric-stack#{System.monotonic_time()}"
end)
|> set_persistent_config(:replication_stream_id, fn ->
String.replace("phoenix_sync#{System.monotonic_time()}", "-", "_")
end)
|> set_persistent_config(:replication_slot_temporary?, true)
end

defp set_environment_defaults(opts, :dev) do
opts
|> set_environment_defaults(:prod)
|> set_persistent_config(:storage_dir, fn ->
Path.join([System.tmp_dir!(), "phoenix-sync#{System.monotonic_time()}"])
end)
end

defp set_environment_defaults(opts, _env) do
opts
|> set_persistent_config(:stack_id, "electric-embedded")
end

defp set_persistent_config(opts, key, value_fun) when is_function(value_fun) do
Keyword.put_new_lazy(opts, key, fn ->
value = value_fun.()
Application.put_env(:phoenix_sync, key, value)
value
end)
end

defp set_persistent_config(opts, key, value) do
set_persistent_config(opts, key, fn -> value end)
end

@doc false
def electric_available? do
@electric_available?
end

defp electric_opts(opts) do
defp electric_opts(opts, env) do
Keyword.pop_lazy(opts, :mode, fn ->
if electric_available?() do
Logger.warning([
"missing mode configuration for :phoenix_sync. Electric is installed so assuming `embedded` mode"
])

:embedded
else
Logger.warning("No `:mode` configuration for :phoenix_sync, assuming `:disabled`")

:disabled
end
default_mode(env)
end)
end

defp electric_api_server(opts) do
config = electric_http_config(opts)
defp default_mode(:test) do
:disabled
end

cond do
Code.ensure_loaded?(Bandit) ->
Electric.Application.api_server(Bandit, config)
if @electric_available? do
defp default_mode(_env) do
Logger.warning([
"missing mode configuration for :phoenix_sync. Electric is installed so assuming `embedded` mode"
])

Code.ensure_loaded?(Plug.Cowboy) ->
Electric.Application.api_server(Plug.Cowboy, config)
:embedded
end
else
defp default_mode(_env) do
Logger.warning("No `:mode` configuration for :phoenix_sync, assuming `:disabled`")

true ->
raise RuntimeError,
message: "No HTTP server found. Please install either Bandit or Plug.Cowboy"
:disabled
end
end

Expand All @@ -236,15 +280,18 @@ defmodule Phoenix.Sync.Electric do
end
end

defp plug_opts(env, :embedded, electric_opts) do
if electric_available?() do
if @electric_available? do
defp plug_opts(env, :embedded, electric_opts) do
env
|> core_configuration(electric_opts)
|> Electric.Application.api_plug_opts()
|> Keyword.fetch!(:api)
else
raise RuntimeError,
message: "Configured for embedded mode but `:electric` dependency not installed"
end
else
defp plug_opts(_env, :embedded, _electric_opts) do
raise ArgumentError,
message:
"phoenix_sync configured in `mode: :embedded` but electric not installed. Please add `:electric` to your dependencies or use `:http` mode."
end
end

Expand All @@ -253,42 +300,73 @@ defmodule Phoenix.Sync.Electric do
end

defp embedded_children(env, mode, opts) do
electric_children(env, mode, opts)
end

defp electric_children(env, mode, opts) do
case validate_database_config(env, mode, opts) do
{:start, db_config_fun, message} ->
if electric_available?() do
db_config =
db_config_fun.()
|> Keyword.update!(:connection_opts, &Electric.Utils.obfuscate_password/1)
start_embedded(env, mode, db_config_fun, message)

electric_config = core_configuration(env, db_config)
:ignore ->
{:ok, []}

Logger.info(message)
{:error, _} = error ->
error
end
end

http_server =
case mode do
:http -> electric_api_server(electric_config)
:embedded -> []
end
if @electric_available? do
defp start_embedded(env, mode, db_config_fun, message) do
db_config =
db_config_fun.()
|> Keyword.update!(:connection_opts, &Electric.Utils.obfuscate_password/1)

{:ok,
[
{Electric.StackSupervisor, Electric.Application.configuration(electric_config)}
| http_server
]}
else
{:error,
"Electric configured to start in embedded mode but :electric dependency not available"}
electric_config = core_configuration(env, db_config)

Logger.info(message)

http_server =
case mode do
:http -> electric_api_server(electric_config)
:embedded -> []
end

:ignore ->
{:ok, []}
{:ok,
[
{Electric.StackSupervisor, Electric.Application.configuration(electric_config)}
| http_server
]}
end

{:error, _} = error ->
error
defp electric_api_server(opts) do
config = electric_http_config(opts)

cond do
Code.ensure_loaded?(Bandit) ->
Electric.Application.api_server(Bandit, config)

Code.ensure_loaded?(Plug.Cowboy) ->
Electric.Application.api_server(Plug.Cowboy, config)

true ->
raise RuntimeError,
message: "No HTTP server found. Please install either Bandit or Plug.Cowboy"
end
end

defp electric_http_config(opts) do
case Keyword.fetch(opts, :http) do
{:ok, http_opts} ->
opts
|> then(fn o ->
if(port = http_opts[:port], do: Keyword.put(o, :service_port, port), else: o)
end)

:error ->
opts
end
end
else
defp start_embedded(_env, _mode, _db_config_fun, _message) do
{:error,
"Electric configured to start in embedded mode but :electric dependency not available"}
end
end

Expand All @@ -299,8 +377,8 @@ defmodule Phoenix.Sync.Electric do
defp core_configuration(env, opts) do
opts
|> env_defaults(env)
|> overrides()
|> stack_id()
|> overrides()
end

defp env_defaults(opts, :dev) do
Expand All @@ -310,24 +388,11 @@ defmodule Phoenix.Sync.Electric do
# if we want to use emphemeral dir for dev storage then we have to persist
# the storage_dir into the application config.
opts
# |> Keyword.put_new(
# :storage_dir,
# Path.join(System.tmp_dir!(), "electric/shape-data#{System.monotonic_time()}")
# )
|> Keyword.put_new(
:storage,
{Electric.ShapeCache.InMemoryStorage,
table_base_name: :"electric-storage#{opts[:stack_id]}", stack_id: opts[:stack_id]}
)
|> Keyword.put_new(
:persistent_kv,
{Electric.PersistentKV.Memory, :new!, []}
)
|> Keyword.put_new(:send_cache_headers?, false)
end

defp env_defaults(opts, :test) do
stack_id = "electric-stack#{System.monotonic_time()}"
stack_id = "electric-stack"

opts = Keyword.put_new(opts, :stack_id, stack_id)

Expand Down Expand Up @@ -389,26 +454,32 @@ defmodule Phoenix.Sync.Electric do
end
end

defp convert_repo_config(repo_config) do
expected_keys = Electric.connection_opts_schema() |> Keyword.keys()

ssl_opts =
case Keyword.get(repo_config, :ssl, nil) do
off when off in [nil, false] -> [sslmode: :disable]
true -> [sslmode: :require]
_opts -> []
end
if @electric_available? do
defp convert_repo_config(repo_config) do
expected_keys = Electric.connection_opts_schema() |> Keyword.keys()

ssl_opts =
case Keyword.get(repo_config, :ssl, nil) do
off when off in [nil, false] -> [sslmode: :disable]
true -> [sslmode: :require]
_opts -> []
end

tcp_opts =
if :inet6 in Keyword.get(repo_config, :socket_options, []),
do: [ipv6: true],
else: []
tcp_opts =
if :inet6 in Keyword.get(repo_config, :socket_options, []),
do: [ipv6: true],
else: []

repo_config
|> Keyword.take(expected_keys)
|> Keyword.merge(ssl_opts)
|> Keyword.merge(tcp_opts)
|> Keyword.put_new(:port, 5432)
repo_config
|> Keyword.take(expected_keys)
|> Keyword.merge(ssl_opts)
|> Keyword.merge(tcp_opts)
|> Keyword.put_new(:port, 5432)
end
else
defp convert_repo_config(_repo_config) do
[]
end
end

defp http_mode_plug_opts(electric_config) do
Expand All @@ -426,19 +497,6 @@ defmodule Phoenix.Sync.Electric do
end
end

defp electric_http_config(opts) do
case Keyword.fetch(opts, :http) do
{:ok, http_opts} ->
opts
|> then(fn o ->
if(port = http_opts[:port], do: Keyword.put(o, :service_port, port), else: o)
end)

:error ->
opts
end
end

if @electric_available? do
defp configure_client(opts, :embedded) do
Electric.Client.embedded(opts)
Expand Down
2 changes: 1 addition & 1 deletion lib/phoenix/sync/plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ defmodule Phoenix.Sync.Plug do
end

"""
@spec send_configuration(Plug.Conn.t(), Phoenix.Sync.shape_definition(), Client.t()) ::
@spec send_configuration(Plug.Conn.t(), Phoenix.Sync.shape_definition(), Electric.Client.t()) ::
Plug.Conn.t()
def send_configuration(conn, shape_or_queryable, client \\ Phoenix.Sync.client!()) do
shape = normalise_shape(shape_or_queryable)
Expand Down
Loading