diff --git a/apps/plug_sync/.formatter.exs b/apps/plug_sync/.formatter.exs new file mode 100644 index 0000000..4890992 --- /dev/null +++ b/apps/plug_sync/.formatter.exs @@ -0,0 +1,5 @@ +# Used by "mix format" +[ + import_deps: [:plug, :ecto, :phoenix_sync], + inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] +] diff --git a/apps/plug_sync/.gitignore b/apps/plug_sync/.gitignore new file mode 100644 index 0000000..d353c8e --- /dev/null +++ b/apps/plug_sync/.gitignore @@ -0,0 +1,25 @@ +# The directory Mix will write compiled artifacts to. +/_build/ + +# If you run "mix test --cover", coverage assets end up here. +/cover/ + +# The directory Mix downloads your dependencies sources to. +/deps/ + +# Where third-party dependencies like ExDoc output generated docs. +/doc/ + +# If the VM crashes, it generates a dump, let's ignore it too. +erl_crash.dump + +# Also ignore archive artifacts (built via "mix archive.build"). +*.ez + +# Ignore package tarball (built via "mix hex.build"). +plug_sync-*.tar + +# Temporary files, for example, from tests. +/tmp/ + +/persistent/ diff --git a/apps/plug_sync/README.md b/apps/plug_sync/README.md new file mode 100644 index 0000000..1116a58 --- /dev/null +++ b/apps/plug_sync/README.md @@ -0,0 +1,7 @@ +# PlugSync + +A test Plug app for easy validation of Phoenix.Sync features within a real app. + +Use `PHOENIX_SYNC_MODE` to set the interaction mode, either `"http"` or `"embedded"`. + +In `http` mode, you should run an Electric server on port 3000. diff --git a/apps/plug_sync/config/config.exs b/apps/plug_sync/config/config.exs new file mode 100644 index 0000000..d1186fe --- /dev/null +++ b/apps/plug_sync/config/config.exs @@ -0,0 +1,3 @@ +import Config + +import_config "#{config_env()}.exs" diff --git a/apps/plug_sync/config/dev.exs b/apps/plug_sync/config/dev.exs new file mode 100644 index 0000000..5110239 --- /dev/null +++ b/apps/plug_sync/config/dev.exs @@ -0,0 +1,35 @@ +import Config + +config :plug_sync, ecto_repos: [PlugSync.Repo], start_server: false + +case System.get_env("PHOENIX_SYNC_MODE", "embedded") do + "http" -> + IO.puts("Starting in HTTP mode") + + config :phoenix_sync, + mode: :http, + env: config_env(), + url: "http://localhost:3000" + + config :plug_sync, PlugSync.Repo, + username: "postgres", + password: "password", + hostname: "localhost", + database: "electric", + port: 54321 + + _ -> + IO.puts("Starting in embedded mode") + + config :phoenix_sync, + mode: :embedded, + env: config_env(), + repo: PlugSync.Repo + + config :plug_sync, PlugSync.Repo, + username: "postgres", + password: "password", + hostname: "localhost", + database: "plug_sync", + port: 55555 +end diff --git a/apps/plug_sync/config/prod.exs b/apps/plug_sync/config/prod.exs new file mode 100644 index 0000000..becde76 --- /dev/null +++ b/apps/plug_sync/config/prod.exs @@ -0,0 +1 @@ +import Config diff --git a/apps/plug_sync/config/test.exs b/apps/plug_sync/config/test.exs new file mode 100644 index 0000000..3f2fd40 --- /dev/null +++ b/apps/plug_sync/config/test.exs @@ -0,0 +1,3 @@ +import Config + +config :phoenix_sync, mode: :sandbox, env: config_env() diff --git a/apps/plug_sync/lib/mix/tasks/plug_sync.insert.ex b/apps/plug_sync/lib/mix/tasks/plug_sync.insert.ex new file mode 100644 index 0000000..40fe564 --- /dev/null +++ b/apps/plug_sync/lib/mix/tasks/plug_sync.insert.ex @@ -0,0 +1,37 @@ +defmodule Mix.Tasks.PlugSync.Insert do + use Mix.Task + + @shortdoc "Inserts sample data into the database for testing PlugSync" + + alias PlugSync.Repo + alias PlugSync.Tasks.Task + alias PlugSync.Tasks.Step + + def run(args) do + {opts, _, _} = OptionParser.parse(args, strict: [rows: :integer, data_size: :integer]) + + # {:ok, repo} = Repo.start_link() + Application.ensure_all_started(:plug_sync) |> dbg + + data_size = Keyword.get(opts, :data_size, 1024) + rows = Keyword.get(opts, :rows, 1024) + + Repo.transaction(fn -> + + items = Stream.repeatedly(fn -> item(data_size) end) |> Enum.take(rows) + Repo.insert_all(PlugSync.Item, items) + + end) + + IO.puts("Sample data inserted successfully.") + end + + defp item(data_size) do + %{ + value: Enum.random(1..100_000), + data: :crypto.strong_rand_bytes(div(data_size , 2) ) |> Base.encode16(), + inserted_at: NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second), + updated_at: NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second), + } + end +end diff --git a/apps/plug_sync/lib/mix/tasks/plug_sync.server.ex b/apps/plug_sync/lib/mix/tasks/plug_sync.server.ex new file mode 100644 index 0000000..0ae4ca6 --- /dev/null +++ b/apps/plug_sync/lib/mix/tasks/plug_sync.server.ex @@ -0,0 +1,18 @@ +defmodule Mix.Tasks.PlugSync.Server do + use Mix.Task + + @shortdoc "Starts a PlugSync server" + + def run(_args) do + Application.put_env(:plug_sync, :server, true, persistent: true) + Mix.Tasks.Run.run(run_args()) + end + + defp run_args do + if iex_running?(), do: [], else: ["--no-halt"] + end + + defp iex_running? do + Code.ensure_loaded?(IEx) and IEx.started?() + end +end diff --git a/apps/plug_sync/lib/plug_sync.ex b/apps/plug_sync/lib/plug_sync.ex new file mode 100644 index 0000000..d282694 --- /dev/null +++ b/apps/plug_sync/lib/plug_sync.ex @@ -0,0 +1,18 @@ +defmodule PlugSync do + @moduledoc """ + Documentation for `PlugSync`. + """ + + @doc """ + Hello world. + + ## Examples + + iex> PlugSync.hello() + :world + + """ + def hello do + :world + end +end diff --git a/apps/plug_sync/lib/plug_sync/application.ex b/apps/plug_sync/lib/plug_sync/application.ex new file mode 100644 index 0000000..69de1af --- /dev/null +++ b/apps/plug_sync/lib/plug_sync/application.ex @@ -0,0 +1,29 @@ +defmodule PlugSync.Application do + # See https://hexdocs.pm/elixir/Application.html + # for more information on OTP Applications + @moduledoc false + + use Application + + @impl true + def start(_type, _args) do + server = + if Application.get_env(:plug_sync, :server, false) do + [ + {Bandit, plug: {PlugSync.Router, phoenix_sync: Phoenix.Sync.plug_opts()}, port: 4444} + ] + else + [] + end + + children = + [ + PlugSync.Repo + ] ++ server + + # See https://hexdocs.pm/elixir/Supervisor.html + # for other strategies and supported options + opts = [strategy: :one_for_one, name: PlugSync.Supervisor] + Supervisor.start_link(children, opts) + end +end diff --git a/apps/plug_sync/lib/plug_sync/item.ex b/apps/plug_sync/lib/plug_sync/item.ex new file mode 100644 index 0000000..4cbf58a --- /dev/null +++ b/apps/plug_sync/lib/plug_sync/item.ex @@ -0,0 +1,11 @@ +defmodule PlugSync.Item do + use Ecto.Schema + + schema "items" do + field :name, :string + field :value, :integer + field :data, :string + + timestamps() + end +end diff --git a/apps/plug_sync/lib/plug_sync/repo.ex b/apps/plug_sync/lib/plug_sync/repo.ex new file mode 100644 index 0000000..b0fe4df --- /dev/null +++ b/apps/plug_sync/lib/plug_sync/repo.ex @@ -0,0 +1,7 @@ +defmodule PlugSync.Repo do + use Phoenix.Sync.Sandbox.Postgres + + use Ecto.Repo, + otp_app: :plug_sync, + adapter: Phoenix.Sync.Sandbox.Postgres.adapter() +end diff --git a/apps/plug_sync/lib/plug_sync/router.ex b/apps/plug_sync/lib/plug_sync/router.ex new file mode 100644 index 0000000..3208fd6 --- /dev/null +++ b/apps/plug_sync/lib/plug_sync/router.ex @@ -0,0 +1,19 @@ +defmodule PlugSync.Router do + use Plug.Router, copy_opts_to_assign: :options + use Phoenix.Sync.Router + + plug :match + plug :dispatch + + sync "/items-mapped", table: "items", transform: &PlugSync.Router.map_item/1 + + match _ do + send_resp(conn, 404, "not found") + end + + def map_item(item) do + [ + Map.update!(item, "value", &Map.update!(&1, "name", fn name -> "#{name} mapped #{&1["id"]}" end)), + ] + end +end diff --git a/apps/plug_sync/mix.exs b/apps/plug_sync/mix.exs new file mode 100644 index 0000000..0653537 --- /dev/null +++ b/apps/plug_sync/mix.exs @@ -0,0 +1,34 @@ +defmodule PlugSync.MixProject do + use Mix.Project + + def project do + [ + app: :plug_sync, + version: "0.1.0", + elixir: "~> 1.18", + start_permanent: Mix.env() == :prod, + deps: deps() + ] + end + + # Run "mix help compile.app" to learn about applications. + def application do + [ + extra_applications: [:logger], + mod: {PlugSync.Application, []} + ] + end + + # Run "mix help deps" to learn about dependencies. + defp deps do + [ + {:plug, "~> 1.0"}, + {:bandit, "~> 1.0"}, + {:postgrex, "~> 0.21"}, + {:ecto_sql, "~> 3.0"}, + {:electric, "~> 1.1.2"}, + {:phoenix_sync, [path: "../..", override: true]}, + {:igniter, "~> 0.6"} + ] + end +end diff --git a/apps/plug_sync/mix.lock b/apps/plug_sync/mix.lock new file mode 100644 index 0000000..65bc7fc --- /dev/null +++ b/apps/plug_sync/mix.lock @@ -0,0 +1,47 @@ +%{ + "backoff": {:hex, :backoff, "1.1.6", "83b72ed2108ba1ee8f7d1c22e0b4a00cfe3593a67dbc792799e8cce9f42f796b", [:rebar3], [], "hexpm", "cf0cfff8995fb20562f822e5cc47d8ccf664c5ecdc26a684cbe85c225f9d7c39"}, + "bandit": {:hex, :bandit, "1.8.0", "c2e93d7e3c5c794272fa4623124f827c6f24b643acc822be64c826f9447d92fb", [:mix], [{:hpax, "~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.18", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "8458ff4eed20ff2a2ea69d4854883a077c33ea42b51f6811b044ceee0fa15422"}, + "combine": {:hex, :combine, "0.10.0", "eff8224eeb56498a2af13011d142c5e7997a80c8f5b97c499f84c841032e429f", [:mix], [], "hexpm", "1b1dbc1790073076580d0d1d64e42eae2366583e7aecd455d1215b0d16f2451b"}, + "db_connection": {:hex, :db_connection, "2.8.0", "64fd82cfa6d8e25ec6660cea73e92a4cbc6a18b31343910427b702838c4b33b2", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "008399dae5eee1bf5caa6e86d204dcb44242c82b1ed5e22c881f2c34da201b15"}, + "decimal": {:hex, :decimal, "2.3.0", "3ad6255aa77b4a3c4f818171b12d237500e63525c2fd056699967a3e7ea20f62", [:mix], [], "hexpm", "a4d66355cb29cb47c3cf30e71329e58361cfcb37c34235ef3bf1d7bf3773aeac"}, + "dotenvy": {:hex, :dotenvy, "1.1.0", "316aee89c11a4ec8be3d74a69d17d17ea2e21e633e0cac9f155cf420e237ccb4", [:mix], [], "hexpm", "0519bda67fdfa1c22279c2654b2f292485f0caae7360fe29205f74f28a93df18"}, + "ecto": {:hex, :ecto, "3.13.2", "7d0c0863f3fc8d71d17fc3ad3b9424beae13f02712ad84191a826c7169484f01", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "669d9291370513ff56e7b7e7081b7af3283d02e046cf3d403053c557894a0b3e"}, + "ecto_sql": {:hex, :ecto_sql, "3.13.2", "a07d2461d84107b3d037097c822ffdd36ed69d1cf7c0f70e12a3d1decf04e2e1", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.13.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.7", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.19 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "539274ab0ecf1a0078a6a72ef3465629e4d6018a3028095dc90f60a19c371717"}, + "electric": {:hex, :electric, "1.1.9", "b0a7774556bf306ffe1b4e4ceb270d0e9dfafdd38044b3ab9f98db8e5ead7bb4", [:mix], [{:backoff, "~> 1.1", [hex: :backoff, repo: "hexpm", optional: false]}, {:bandit, "~> 1.6", [hex: :bandit, repo: "hexpm", optional: false]}, {:dotenvy, "~> 1.1", [hex: :dotenvy, repo: "hexpm", optional: false]}, {:ecto, "~> 3.12", [hex: :ecto, repo: "hexpm", optional: false]}, {:electric_cubdb, "~> 2.0", [hex: :electric_cubdb, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:opentelemetry, "~> 1.5", [hex: :opentelemetry, repo: "hexpm", optional: true]}, {:opentelemetry_exporter, "~> 1.8", [hex: :opentelemetry_exporter, repo: "hexpm", optional: true]}, {:opentelemetry_semantic_conventions, "~> 1.27", [hex: :opentelemetry_semantic_conventions, repo: "hexpm", optional: false]}, {:opentelemetry_telemetry, "~> 1.1", [hex: :opentelemetry_telemetry, repo: "hexpm", optional: false]}, {:otel_metric_exporter, "~> 0.3.11", [hex: :otel_metric_exporter, repo: "hexpm", optional: true]}, {:pg_query_ex, "0.9.0", [hex: :pg_query_ex, repo: "hexpm", optional: false]}, {:plug, "~> 1.17", [hex: :plug, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.20", [hex: :postgrex, repo: "hexpm", optional: false]}, {:remote_ip, "~> 1.2", [hex: :remote_ip, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}, {:retry, "~> 0.19", [hex: :retry, repo: "hexpm", optional: false]}, {:sentry, "~> 11.0", [hex: :sentry, repo: "hexpm", optional: true]}, {:stream_split, "~> 0.1", [hex: :stream_split, repo: "hexpm", optional: false]}, {:telemetry_metrics_prometheus_core, "~> 1.1", [hex: :telemetry_metrics_prometheus_core, repo: "hexpm", optional: true]}, {:telemetry_metrics_statsd, "~> 0.7", [hex: :telemetry_metrics_statsd, repo: "hexpm", optional: true]}, {:telemetry_poller, "~> 1.2", [hex: :telemetry_poller, repo: "hexpm", optional: false]}, {:tls_certificate_check, "~> 1.27", [hex: :tls_certificate_check, repo: "hexpm", optional: false]}, {:tz, "~> 0.28", [hex: :tz, repo: "hexpm", optional: false]}], "hexpm", "cd8beea1d005424fd24aa863e85a707e49b770accd23e66066f6c86524643464"}, + "electric_client": {:hex, :electric_client, "0.7.2", "06f221fa7379d41ab4fb771c9cf78f26654d7c265f61faffa8c31e6b73073224", [:mix], [{:ecto_sql, "~> 3.12", [hex: :ecto_sql, repo: "hexpm", optional: true]}, {:electric, "~> 1.1.1", [hex: :electric, repo: "hexpm", optional: true]}, {:gen_stage, "~> 1.2", [hex: :gen_stage, repo: "hexpm", optional: true]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "4036796cc21767917f1c1c72541b0865a5585b9b4a59ccdb15b99af9c457c97e"}, + "electric_cubdb": {:hex, :electric_cubdb, "2.0.2", "36f86e3c52dc26f4e077a49fbef813b1a38d3897421cece851f149190b34c16c", [:mix], [], "hexpm", "0c0e24b31fb76ad1b33c5de2ab35c41a4ff9da153f5c1f9b15e2de78575acaf2"}, + "elixir_make": {:hex, :elixir_make, "0.9.0", "6484b3cd8c0cee58f09f05ecaf1a140a8c97670671a6a0e7ab4dc326c3109726", [:mix], [], "hexpm", "db23d4fd8b757462ad02f8aa73431a426fe6671c80b200d9710caf3d1dd0ffdb"}, + "finch": {:hex, :finch, "0.20.0", "5330aefb6b010f424dcbbc4615d914e9e3deae40095e73ab0c1bb0968933cadf", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2658131a74d051aabfcba936093c903b8e89da9a1b63e430bee62045fa9b2ee2"}, + "glob_ex": {:hex, :glob_ex, "0.1.11", "cb50d3f1ef53f6ca04d6252c7fde09fd7a1cf63387714fe96f340a1349e62c93", [:mix], [], "hexpm", "342729363056e3145e61766b416769984c329e4378f1d558b63e341020525de4"}, + "hpax": {:hex, :hpax, "1.0.3", "ed67ef51ad4df91e75cc6a1494f851850c0bd98ebc0be6e81b026e765ee535aa", [:mix], [], "hexpm", "8eab6e1cfa8d5918c2ce4ba43588e894af35dbd8e91e6e55c817bca5847df34a"}, + "igniter": {:hex, :igniter, "0.6.28", "9db10192f19f10b924f14c805f5b2ad992617fccaff9cf9582b7f065d562d4d8", [:mix], [{:glob_ex, "~> 0.1.7", [hex: :glob_ex, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:owl, "~> 0.11", [hex: :owl, repo: "hexpm", optional: false]}, {:phx_new, "~> 1.7", [hex: :phx_new, repo: "hexpm", optional: true]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}, {:rewrite, ">= 1.1.1 and < 2.0.0-0", [hex: :rewrite, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.4", [hex: :sourceror, repo: "hexpm", optional: false]}, {:spitfire, ">= 0.1.3 and < 1.0.0-0", [hex: :spitfire, repo: "hexpm", optional: false]}], "hexpm", "ad9369d626aeca21079ef17661a2672fb32598610c5e5bccae2537efd36b27d4"}, + "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, + "mime": {:hex, :mime, "2.0.7", "b8d739037be7cd402aee1ba0306edfdef982687ee7e9859bee6198c1e7e2f128", [:mix], [], "hexpm", "6171188e399ee16023ffc5b76ce445eb6d9672e2e241d2df6050f3c771e80ccd"}, + "mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"}, + "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, + "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, + "opentelemetry_api": {:hex, :opentelemetry_api, "1.4.1", "e071429a37441a0fe9097eeea0ff921ebadce8eba8e1ce297b05a43c7a0d121f", [:mix, :rebar3], [], "hexpm", "39bdb6ad740bc13b16215cb9f233d66796bbae897f3bf6eb77abb712e87c3c26"}, + "opentelemetry_semantic_conventions": {:hex, :opentelemetry_semantic_conventions, "1.27.0", "acd0194a94a1e57d63da982ee9f4a9f88834ae0b31b0bd850815fe9be4bbb45f", [:mix, :rebar3], [], "hexpm", "9681ccaa24fd3d810b4461581717661fd85ff7019b082c2dff89c7d5b1fc2864"}, + "opentelemetry_telemetry": {:hex, :opentelemetry_telemetry, "1.1.2", "410ab4d76b0921f42dbccbe5a7c831b8125282850be649ee1f70050d3961118a", [:mix, :rebar3], [{:opentelemetry_api, "~> 1.3", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "641ab469deb181957ac6d59bce6e1321d5fe2a56df444fc9c19afcad623ab253"}, + "owl": {:hex, :owl, "0.12.2", "65906b525e5c3ef51bab6cba7687152be017aebe1da077bb719a5ee9f7e60762", [:mix], [{:ucwidth, "~> 0.2", [hex: :ucwidth, repo: "hexpm", optional: true]}], "hexpm", "6398efa9e1fea70a04d24231e10dcd66c1ac1aa2da418d20ef5357ec61de2880"}, + "pg_query_ex": {:hex, :pg_query_ex, "0.9.0", "8e34bd2d0e0eb9e8d621c4697032fad4bfba46826950d3b46904a80ab589b43a", [:make, :mix], [{:elixir_make, "~> 0.4", [hex: :elixir_make, repo: "hexpm", optional: false]}, {:protox, "~> 2.0", [hex: :protox, repo: "hexpm", optional: false]}], "hexpm", "a3fada1704fa9e2bc11ff846ad545ef9a1d34f46d86206063c37128960f4f5f5"}, + "plug": {:hex, :plug, "1.18.1", "5067f26f7745b7e31bc3368bc1a2b818b9779faa959b49c934c17730efc911cf", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "57a57db70df2b422b564437d2d33cf8d33cd16339c1edb190cd11b1a3a546cc2"}, + "plug_crypto": {:hex, :plug_crypto, "2.1.1", "19bda8184399cb24afa10be734f84a16ea0a2bc65054e23a62bb10f06bc89491", [:mix], [], "hexpm", "6470bce6ffe41c8bd497612ffde1a7e4af67f36a15eea5f921af71cf3e11247c"}, + "postgrex": {:hex, :postgrex, "0.21.1", "2c5cc830ec11e7a0067dd4d623c049b3ef807e9507a424985b8dcf921224cd88", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "27d8d21c103c3cc68851b533ff99eef353e6a0ff98dc444ea751de43eb48bdac"}, + "protox": {:hex, :protox, "2.0.4", "2a86ae3699696c5d92e15804968ce6a6827a8d9516d0bbabcf16584dec710ae1", [:mix], [], "hexpm", "8ac5a03bb84da4c75d76dc29cd46008081c2068ad0f6f0da4c051093d6e24c01"}, + "remote_ip": {:hex, :remote_ip, "1.2.0", "fb078e12a44414f4cef5a75963c33008fe169b806572ccd17257c208a7bc760f", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "2ff91de19c48149ce19ed230a81d377186e4412552a597d6a5137373e5877cb7"}, + "req": {:hex, :req, "0.5.15", "662020efb6ea60b9f0e0fac9be88cd7558b53fe51155a2d9899de594f9906ba9", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "a6513a35fad65467893ced9785457e91693352c70b58bbc045b47e5eb2ef0c53"}, + "retry": {:hex, :retry, "0.19.0", "aeb326d87f62295d950f41e1255fe6f43280a1b390d36e280b7c9b00601ccbc2", [:mix], [], "hexpm", "85ef376aa60007e7bff565c366310966ec1bd38078765a0e7f20ec8a220d02ca"}, + "rewrite": {:hex, :rewrite, "1.1.2", "f5a5d10f5fed1491a6ff48e078d4585882695962ccc9e6c779bae025d1f92eda", [:mix], [{:glob_ex, "~> 0.1", [hex: :glob_ex, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.0", [hex: :sourceror, repo: "hexpm", optional: false]}, {:text_diff, "~> 0.1", [hex: :text_diff, repo: "hexpm", optional: false]}], "hexpm", "7f8b94b1e3528d0a47b3e8b7bfeca559d2948a65fa7418a9ad7d7712703d39d4"}, + "sourceror": {:hex, :sourceror, "1.10.0", "38397dedbbc286966ec48c7af13e228b171332be1ad731974438c77791945ce9", [:mix], [], "hexpm", "29dbdfc92e04569c9d8e6efdc422fc1d815f4bd0055dc7c51b8800fb75c4b3f1"}, + "spitfire": {:hex, :spitfire, "0.2.1", "29e154873f05444669c7453d3d931820822cbca5170e88f0f8faa1de74a79b47", [:mix], [], "hexpm", "6eeed75054a38341b2e1814d41bb0a250564092358de2669fdb57ff88141d91b"}, + "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, + "stream_split": {:hex, :stream_split, "0.1.7", "2d3fd1fd21697da7f91926768d65f79409086052c9ec7ae593987388f52425f8", [:mix], [], "hexpm", "1dc072ff507a64404a0ad7af90df97096183fee8eeac7b300320cea7c4679147"}, + "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, + "telemetry_poller": {:hex, :telemetry_poller, "1.3.0", "d5c46420126b5ac2d72bc6580fb4f537d35e851cc0f8dbd571acf6d6e10f5ec7", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "51f18bed7128544a50f75897db9974436ea9bfba560420b646af27a9a9b35211"}, + "text_diff": {:hex, :text_diff, "0.1.0", "1caf3175e11a53a9a139bc9339bd607c47b9e376b073d4571c031913317fecaa", [:mix], [], "hexpm", "d1ffaaecab338e49357b6daa82e435f877e0649041ace7755583a0ea3362dbd7"}, + "thousand_island": {:hex, :thousand_island, "1.4.1", "8df065e627407e281f7935da5ad0f3842d10eb721afa92e760b720d71e2e37aa", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "204a8640e5d2818589b87286ae66160978628d7edf6095181cbe0440765fb6c1"}, + "tls_certificate_check": {:hex, :tls_certificate_check, "1.29.0", "4473005eb0bbdad215d7083a230e2e076f538d9ea472c8009fd22006a4cfc5f6", [:rebar3], [{:ssl_verify_fun, "~> 1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "5b0d0e5cb0f928bc4f210df667304ed91c5bff2a391ce6bdedfbfe70a8f096c5"}, + "tz": {:hex, :tz, "0.28.1", "717f5ffddfd1e475e2a233e221dc0b4b76c35c4b3650b060c8e3ba29dd6632e9", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:mint, "~> 1.6", [hex: :mint, repo: "hexpm", optional: true]}], "hexpm", "bfdca1aa1902643c6c43b77c1fb0cb3d744fd2f09a8a98405468afdee0848c8a"}, + "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, +} diff --git a/apps/plug_sync/priv/repo/migrations/20250828121415_create_items.exs b/apps/plug_sync/priv/repo/migrations/20250828121415_create_items.exs new file mode 100644 index 0000000..81ea8d3 --- /dev/null +++ b/apps/plug_sync/priv/repo/migrations/20250828121415_create_items.exs @@ -0,0 +1,13 @@ +defmodule PlugSync.Repo.Migrations.CreateItems do + use Ecto.Migration + + def change do + create table(:items) do + add :name, :string + add :value, :integer + add :data, :text + + timestamps() + end + end +end diff --git a/apps/plug_sync/test/plug_sync_test.exs b/apps/plug_sync/test/plug_sync_test.exs new file mode 100644 index 0000000..28bf9cb --- /dev/null +++ b/apps/plug_sync/test/plug_sync_test.exs @@ -0,0 +1,8 @@ +defmodule PlugSyncTest do + use ExUnit.Case + doctest PlugSync + + test "greets the world" do + assert PlugSync.hello() == :world + end +end diff --git a/apps/plug_sync/test/test_helper.exs b/apps/plug_sync/test/test_helper.exs new file mode 100644 index 0000000..869559e --- /dev/null +++ b/apps/plug_sync/test/test_helper.exs @@ -0,0 +1 @@ +ExUnit.start() diff --git a/lib/phoenix/sync.ex b/lib/phoenix/sync.ex index 610ff74..1dd872a 100644 --- a/lib/phoenix/sync.ex +++ b/lib/phoenix/sync.ex @@ -11,6 +11,7 @@ defmodule Phoenix.Sync do @shape_keys [:namespace, :where, :columns] @shape_params @shape_keys |> Enum.map(&to_string/1) + @json if(Code.ensure_loaded?(Jason), do: Jason, else: JSON) @type queryable() :: Ecto.Queryable.t() | Ecto.Schema.t() | Ecto.Changeset.t() @type shape_specification :: [ @@ -87,6 +88,9 @@ defmodule Phoenix.Sync do defdelegate client!(), to: Phoenix.Sync.Client, as: :new! + @doc false + def json_library, do: @json + _ = """ Use request query parameters to create a `Electric.Client.ShapeDefinition`. @@ -328,6 +332,119 @@ defmodule Phoenix.Sync do params: [false] ) + ## Transforms + + Using the `transform` option it's possible to modify the sync messages before + they are sent to the clients via the [`sync`](`Phoenix.Sync.Router.sync/3`) + router macro or [`sync_render`](`Phoenix.Sync.Controller.sync_render/4`) + within your controllers. + + Phoenix.Sync.shape!( + table: "todos", + transform: &MyApp.Todos.transform/1 + ) + + The transform function is passed the change messages in raw form and can + transform the messages to a limited extent as required by the application. + + The `transform` process is effectively a `Stream.flat_map/2` operation over + the sync messages, so if you want to use pattern matching to perform some + kind of additional filtering operation to remove messages, e.g. based on some + authorization logic, then you can simply return an empty list: + + # don't send delete messages to the client + def transform(%{"headers" => %{"operation" => "delete"}}), do: [] + def transform(message), do: [message] + + Removing messages from the stream can be useful for cases where you want to + perform additional runtime filtering for authorization reasons that you're + not able to do at the database level. Be aware that this can impact + consistency of the client state so is an advanced feature that should be used + with care. + + The messages passed to the transform function are of the form: + + %{ + "key" => key, + "headers" => %{"operation" => operation, ...}, + "value" => %{"column_name" => column_value, ...} + } + + - `key` is a unique identifier for the row formed of the namespaced table + name plus the values of the row's primary key(s). **DO NOT MODIFY THIS + VALUE**. + + - `headers` is a map of metadata about the change. The `operation` key will + have one of the values `"insert"`, `"update"` or `"delete"`. You should leave + this as-is unless you have a very good reason to modify it. + + - `value` is the actual row data. Unless the shape is defined with `replica: + :full` only `insert` operations will contain the full row data. `update` + operations will only contain the columns that have changed and `delete` + operations will only contain the primary key columns. + + You can modify the values of the `value` map, as required by you application, + but you should only modify values in a way that's compatible with the + column's datatype. E.g. don't concat a integer column with a string (unless + the resulting string will parse as an integer...). It is also unwise to + modify the primary key values of any row unless you can be sure not to cause + conflicts. Any column values you add that aren't in the backing Postgres + table will be passed through to the client as-is. + + When using the raw [`stream/2`](`Phoenix.Sync.Client.stream/2`) function to + receive a sync stream directly, the `transform` option is unnecessary and + hence ignored. You should use the functions available in `Enum` and `Stream` + to perform any data transforms. + + ### Transform via Ecto.Schema + + If you have custom field types in your `Ecto.Schema` module you can set up a + transform that passes the raw data from the replication stream through the + `Ecto` load machinery to ensure that the sync stream values match the values + you would see when using `Ecto` to load data directly from the database. + + To do this pass the `Ecto.Schema` module as the transform function: + + Phoenix.Sync.shape!( + MyApp.Todos.Todo, + transform: MyApp.Todos.Todo + ) + + or in a route: + + sync "todos", MyApp.Todos.Todo, + transform: MyApp.Todos.Todo + + For this to work you need to implement `Jason.Encoder` for your schema module + (or `JSON.Encoder` if you're on Elixir >= 1.18 but if [`Jason`](https://hex.pm/packages/jason) is available + then it will be used), e.g.: + + defmodule MyApp.Todos.Todo do + use Ecto.Schema + + @derive {Jason.Encoder, except: [:__meta__]} + + schema "todos" do + field :title, :string + field :completed, :boolean, default: false + end + end + + > #### Effect of `transform` on server load {: .warning} + > + > Normally `Phoenix.Sync` simply passes the raw encoded JSON message stream + > from the backend server straight to the clients, which puts very little load + > on the application server. + > + > The `transform` mechanism requires intercepting, decoding, mutating and + > re-encoding every message from the backend server before they are sent to the + > client. This could be costly for busy shapes or lots of connected clients. + + ### Limitations + + See the documentation of `Phoenix.Sync.Router.sync/3` for additional + constraints on transform functions when defined within a route. + ## Options When defining a shape via a keyword list, it supports the following options: diff --git a/lib/phoenix/sync/electric.ex b/lib/phoenix/sync/electric.ex index 032c1ab..63a2e46 100644 --- a/lib/phoenix/sync/electric.ex +++ b/lib/phoenix/sync/electric.ex @@ -589,6 +589,40 @@ defmodule Phoenix.Sync.Electric do |> Plug.Conn.send_resp(response.status, Enum.into(response.body, [])) end end + + @json Phoenix.Sync.json_library() + + @doc false + def map_response_body(body, nil) do + body + end + + # empty body is a valid response but not valid JSON + def map_response_body("", _mapper) do + "" + end + + def map_response_body(body, mapper) when is_binary(body) and is_function(mapper, 1) do + body + |> @json.decode!() + |> map_response_body(mapper) + |> then(fn item -> [@json.encode_to_iodata!(item)] end) + end + + def map_response_body(msgs, mapper) when is_list(msgs) and is_function(mapper, 1) do + msgs + |> Enum.flat_map(fn + %{"key" => _key, "headers" => _, "value" => _} = msg -> + mapper.(msg) + + control -> + [control] + end) + end + + def map_response_body(msgs, _mapper) do + msgs + end end if Code.ensure_loaded?(Electric.Shapes.Api) do @@ -596,9 +630,10 @@ if Code.ensure_loaded?(Electric.Shapes.Api) do alias Electric.Shapes alias Phoenix.Sync.PredefinedShape + alias Phoenix.Sync.Electric.ApiAdapter def predefined_shape(api, %PredefinedShape{} = shape) do - Shapes.Api.predefined_shape(api, PredefinedShape.to_api_params(shape)) + ApiAdapter.new(api, shape) end def call(api, %{method: "GET"} = conn, params) do diff --git a/lib/phoenix/sync/electric/api_adapter.ex b/lib/phoenix/sync/electric/api_adapter.ex new file mode 100644 index 0000000..89492f4 --- /dev/null +++ b/lib/phoenix/sync/electric/api_adapter.ex @@ -0,0 +1,65 @@ +if Code.ensure_loaded?(Electric.Shapes.Api) do + defmodule Phoenix.Sync.Electric.ApiAdapter do + @moduledoc false + + defstruct [:api, :shape] + + alias Phoenix.Sync.PredefinedShape + alias Electric.Shapes + + def new(%Shapes.Api{} = api, %PredefinedShape{} = predefined_shape) do + with {:ok, configured_api} <- + Shapes.Api.predefined_shape(api, PredefinedShape.to_api_params(predefined_shape)) do + {:ok, %__MODULE__{api: configured_api, shape: predefined_shape}} + end + end + + defimpl Phoenix.Sync.Adapter.PlugApi do + alias Phoenix.Sync.Electric.ApiAdapter + + def predefined_shape(_api, %PredefinedShape{} = _shape) do + raise ArgumentError, + message: "#{inspect(__MODULE__)} does not support nested predefined shapes" + end + + def call(%ApiAdapter{api: api, shape: shape}, %{method: "GET"} = conn, params) do + if transform_fun = PredefinedShape.transform_fun(shape) do + case Shapes.Api.validate(api, params) do + {:ok, request} -> + response = Shapes.Api.serve_shape_log(request) + response = Map.update!(response, :body, &apply_transform(&1, transform_fun)) + + conn + |> content_type() + |> Plug.Conn.assign(:request, request) + |> Plug.Conn.assign(:response, response) + |> Shapes.Api.Response.send(response) + + {:error, response} -> + conn + |> content_type() + |> Shapes.Api.Response.send(response) + |> Plug.Conn.halt() + end + else + Phoenix.Sync.Adapter.PlugApi.call(api, conn, params) + end + end + + def call(%ApiAdapter{api: api}, conn, params) do + Phoenix.Sync.Adapter.PlugApi.call(api, conn, params) + end + + defp content_type(conn) do + Plug.Conn.put_resp_content_type(conn, "application/json") + end + + defp apply_transform(stream, transform_fun) do + stream + |> Enum.to_list() + |> IO.iodata_to_binary() + |> Phoenix.Sync.Electric.map_response_body(transform_fun) + end + end + end +end diff --git a/lib/phoenix/sync/electric/client_adapter.ex b/lib/phoenix/sync/electric/client_adapter.ex index 4370b2c..e0d1750 100644 --- a/lib/phoenix/sync/electric/client_adapter.ex +++ b/lib/phoenix/sync/electric/client_adapter.ex @@ -21,7 +21,7 @@ defmodule Phoenix.Sync.Electric.ClientAdapter do # this is the server-defined shape route, so we want to only pass on the # per-request/stream position params leaving the shape-definition params # from the configured client. - def call(%{shape_definition: %PredefinedShape{}} = sync_client, conn, params) do + def call(%{shape_definition: %PredefinedShape{} = shape} = sync_client, conn, params) do request = Client.request( sync_client.client, @@ -32,7 +32,7 @@ defmodule Phoenix.Sync.Electric.ClientAdapter do next_cursor: params["cursor"] ) - fetch_upstream(sync_client, conn, request) + fetch_upstream(sync_client, conn, request, shape) end # this version is the pure client-defined shape version @@ -44,22 +44,32 @@ defmodule Phoenix.Sync.Electric.ClientAdapter do params: params ) - fetch_upstream(sync_client, conn, request) + fetch_upstream(sync_client, conn, request, nil) end defp normalise_method(method), do: method |> String.downcase() |> String.to_atom() defp live?(live), do: live == "true" - defp fetch_upstream(sync_client, conn, request) do + defp fetch_upstream(sync_client, conn, request, shape) do response = case Client.Fetch.request(sync_client.client, request) do %Client.Fetch.Response{} = response -> response {:error, %Client.Fetch.Response{} = response} -> response end + body = + if response.status == 200 do + Phoenix.Sync.Electric.map_response_body( + response.body, + PredefinedShape.transform_fun(shape) + ) + else + response.body + end + conn |> put_headers(response.headers) - |> Plug.Conn.send_resp(response.status, response.body) + |> Plug.Conn.send_resp(response.status, body) end defp put_headers(conn, headers) do diff --git a/lib/phoenix/sync/predefined_shape.ex b/lib/phoenix/sync/predefined_shape.ex index bc777ad..1f58ad5 100644 --- a/lib/phoenix/sync/predefined_shape.ex +++ b/lib/phoenix/sync/predefined_shape.ex @@ -6,28 +6,45 @@ defmodule Phoenix.Sync.PredefinedShape do alias Electric.Client.ShapeDefinition + @api_schema_opts [ + storage: [type: {:or, [:map, nil]}] + ] + + @sync_schema_opts [ + transform: [ + type: {:or, [:mfa, {:fun, 1}, :atom]}, + doc: """ + A transform function to apply to each row. + + This can be either a MFA tuple (`{MyModule, :my_fun, [arg1, ...]}`), a + `&MyModule.fun/1` capture or an `Ecto.Schema` module (depending on use). + + See the documentation of `Phoenix.Sync.shape!/2` for more details. + """, + type_doc: ~s/`(map() -> [map()]) | mfa() | module()`/ + ] + ] + shape_schema_gen = fn required? -> Keyword.take( [table: [type: :string, required: required?]] ++ ShapeDefinition.schema_definition(), ShapeDefinition.public_keys() - ) + ) ++ @sync_schema_opts end @shape_definition_schema shape_schema_gen.(false) @keyword_shape_schema shape_schema_gen.(true) - @api_schema_opts [ - storage: [type: {:or, [:map, nil]}] - ] - @shape_schema NimbleOptions.new!(@shape_definition_schema) @api_schema NimbleOptions.new!(@api_schema_opts) @stream_schema Electric.Client.Stream.options_schema() @public_schema NimbleOptions.new!(@shape_definition_schema ++ @api_schema_opts) + @sync_schema NimbleOptions.new!(@sync_schema_opts) @api_schema_keys Keyword.keys(@api_schema_opts) @stream_schema_keys Keyword.keys(@stream_schema.schema) @shape_definition_keys ShapeDefinition.public_keys() + @sync_schema_keys Keyword.keys(@sync_schema_opts) # we hold the query separate from the shape definition in order to allow # for transformation of a query to a shape definition at runtime rather @@ -36,7 +53,8 @@ defmodule Phoenix.Sync.PredefinedShape do :shape_config, :api_config, :stream_config, - :query + :query, + :sync_config ] @type t :: %__MODULE__{} @@ -93,8 +111,23 @@ defmodule Phoenix.Sync.PredefinedShape do defp new(opts), do: struct(__MODULE__, opts) defp split_and_validate_opts!(opts, mode) do + {shape_config, api_config, stream_config, sync_config} = + opts + |> split_opts!() + |> validate_opts!(mode) + + [ + shape_config: shape_config, + api_config: api_config, + stream_config: stream_config, + sync_config: sync_config + ] + end + + defp split_opts!(opts) do {shape_opts, other_opts} = Keyword.split(opts, @shape_definition_keys) {api_opts, other_opts} = Keyword.split(other_opts, @api_schema_keys) + {sync_opts, other_opts} = Keyword.split(other_opts, @sync_schema_keys) stream_opts = case Keyword.split(other_opts, @stream_schema_keys) do @@ -106,8 +139,13 @@ defmodule Phoenix.Sync.PredefinedShape do message: "received invalid options to a shape definition: #{inspect(invalid_opts)}" end + {shape_opts, api_opts, stream_opts, sync_opts} + end + + defp validate_opts!({shape_opts, api_opts, stream_opts, sync_opts}, mode) do shape_config = validate_shape_config(shape_opts, mode) api_config = NimbleOptions.validate!(api_opts, @api_schema) + sync_config = NimbleOptions.validate!(sync_opts, @sync_schema) # remove replica value from the stream because it will override the shape # setting and since we've removed the `:replica` value earlier @@ -117,7 +155,7 @@ defmodule Phoenix.Sync.PredefinedShape do |> Enum.reject(&is_nil(elem(&1, 1))) |> Enum.reject(&(elem(&1, 0) == :replica)) - [shape_config: shape_config, api_config: api_config, stream_config: stream_config] + {shape_config, api_config, stream_config, sync_config} end # If we're defining a shape with a keyword list then we need at least the @@ -130,6 +168,85 @@ defmodule Phoenix.Sync.PredefinedShape do NimbleOptions.validate!(shape_opts, @shape_schema) end + @doc false + def new_macro!(queryable, shape_opts, caller, macro_opts \\ []) + + def new_macro!(queryable, shape_opts, caller, macro_opts) + when is_tuple(queryable) and is_list(shape_opts) do + function_caller = + case Keyword.fetch(macro_opts, :function) do + {:ok, function} -> %{caller | function: function} + :error -> caller + end + + case Macro.expand_literals(queryable, function_caller) do + schema when is_atom(schema) -> + new!(schema, macro_sanitise_opts(shape_opts, caller, macro_opts)) + + query when is_tuple(query) -> + raise ArgumentError, + message: + "Router shape configuration only accepts a Ecto.Schema module as a query." <> + " For Ecto.Query support please use `Phoenix.Sync.Controller.sync_render/3`" + end + end + + def new_macro!(shape, opts, caller, macro_opts) when is_list(shape) and is_list(opts) do + shape = macro_sanitise_opts(shape, caller, macro_opts) + opts = macro_sanitise_opts(opts, caller, macro_opts) + + new!(shape, opts) + end + + defp macro_sanitise_opts(opts, caller, macro_opts) do + opts + |> Keyword.replace_lazy(:storage, fn storage_ast -> + {storage, _binding} = Code.eval_quoted(storage_ast, [], caller) + storage + end) + |> Keyword.replace_lazy(:transform, fn + {:&, _, _} = transform_ast -> + context = Keyword.fetch!(macro_opts, :context) + + {transform, _binding} = + transform_ast + |> macro_validate_capture!(context) + |> Code.eval_quoted([], caller) + + transform + + {:{}, _, _} = transform_ast -> + {transform, _binding} = Code.eval_quoted(transform_ast, [], caller) + transform + + {:__aliases__, _, _} = transform_ast -> + {transform, _binding} = Code.eval_quoted(transform_ast, [], caller) + transform + end) + end + + # phoenix router does not support captures in the router + defp macro_validate_capture!(_ast, :phoenix) do + raise ArgumentError, + message: + "Invalid transform function specification in sync shape definition." <> + " When using Phoenix Router please use an MFA tuple (`transform: {Mod, :fun, [arg1, ...]}`)" + end + + # only Mod.fun/1 style captures are supported -- you can't quote a &mod.fun(&1) style capture + defp macro_validate_capture!(ast, :plug) do + case ast do + {:&, _, [{:/, _, _}]} = capture_ast -> + capture_ast + + _ -> + raise ArgumentError, + message: + "Invalid transform function specification in sync shape definition." <> + " Expected either a capture (&Mod.fun/1) or MFA tuple ({Mod, :fun, [arg1, ...]})" + end + end + def client(%Electric.Client{} = client, %__MODULE__{} = predefined_shape) do Electric.Client.merge_params(client, to_client_params(predefined_shape)) end @@ -183,4 +300,39 @@ defmodule Phoenix.Sync.PredefinedShape do end end end + + @doc false + def transform_fun(nil), do: nil + + def transform_fun(%__MODULE__{sync_config: sync_config}) do + transform_fun(sync_config[:transform]) + end + + def transform_fun({m, f, a}) when is_atom(m) and is_atom(f) and is_list(a) do + fn row -> List.wrap(apply(m, f, [row | a])) end + end + + def transform_fun(fun) when is_function(fun, 1) do + fn msg -> List.wrap(fun.(msg)) end + end + + def transform_fun(module) when is_atom(module) do + ecto_transform_fun(module) + end + + if Code.ensure_loaded?(Ecto) do + defp ecto_transform_fun(module) when is_atom(module) do + ecto_transform = Electric.Client.EctoAdapter.for_schema(%{}, module) + &apply_ecto_transform(&1, ecto_transform) + end + + defp apply_ecto_transform(%{"value" => value} = msg, transform_fun) do + [Map.put(msg, "value", transform_fun.(value))] + end + else + defp ecto_transform_fun(module) when is_atom(module) do + raise ArgumentError, + message: "Ecto not available: cannot generate transform function from #{inspect(module)}" + end + end end diff --git a/lib/phoenix/sync/router.ex b/lib/phoenix/sync/router.ex index 2a0a7ea..ae16dfd 100644 --- a/lib/phoenix/sync/router.ex +++ b/lib/phoenix/sync/router.ex @@ -23,7 +23,7 @@ defmodule Phoenix.Sync.Router do ## Plug Integration - Within your `Plug.Router` module, `use #{__MODULE__}` and then + Within your `Plug.Router` module, `use #{__MODULE__}` and then add your `sync` routes: defmodule MyApp.Plug.Router do @@ -42,6 +42,29 @@ defmodule Phoenix.Sync.Router do You **must** use the `copy_opts_to_assign` option in `Plug.Router` in order for the `sync` macro to get the configuration defined in your `application.ex` [`start/2`](`c:Application.start/2`) callback. + + ## Transforms + + You can add a `transform` function to your shapes as explained in + [`Phoenix.Sync.shape/2`](`Phoenix.Sync#shape!/2-transforms`) but, because `sync/2` and `sync/3` are + macros, you need to use the `{module, function, args}` form when declaring + the `transform` function. + + defmodule MyApp.Router do + use Plug.Router, copy_opts_to_assign: :options + + # ... + + sync "/shapes/pending-todos", MyApp.Todos.Todo, + where: "completed = false", + transform: {MyApp.Router, :transform_todo, ["[PENDING]"]} + + def transform_todo(msg, prefix) do + Map.update!(msg, "values", fn todo -> + Map.put(todo, "title", prefix <> " " <> todo["title"]) + end) + end + end """ import Phoenix.Sync.Plug.Utils @@ -94,12 +117,12 @@ defmodule Phoenix.Sync.Router do more details on keyword-based shapes. """ defmacro sync(path, opts) when is_list(opts) do - route(env!(__CALLER__), path, build_definition(__CALLER__, opts)) + route(env!(__CALLER__), path, define_shape(opts, [], __CALLER__)) end # e.g. shape "/path", Ecto.Query.from(t in MyTable) defmacro sync(path, queryable) when is_tuple(queryable) do - route(env!(__CALLER__), path, build_shape_from_query(queryable, __CALLER__, [])) + route(env!(__CALLER__), path, define_shape(queryable, [], __CALLER__)) end @doc """ @@ -112,82 +135,43 @@ defmodule Phoenix.Sync.Router do """ # e.g. shape "/path", Ecto.Query.from(t in MyTable), replica: :full defmacro sync(path, queryable, opts) when is_tuple(queryable) and is_list(opts) do - route(env!(__CALLER__), path, build_shape_from_query(queryable, __CALLER__, opts)) + route( + env!(__CALLER__), + path, + define_shape(queryable, opts, __CALLER__) + ) end defp route(:plug, path, definition) do - quote do - Plug.Router.match(unquote(path), + quote bind_quoted: [path: path, shape: Macro.escape(definition)] do + Plug.Router.match(path, via: :get, to: Phoenix.Sync.Router.Shape, init_opts: %{ - shape: unquote(Macro.escape(definition)), - plug_opts_assign: @plug_assign_opts + plug_opts_assign: @plug_assign_opts, + shape: shape } ) end end defp route(:phoenix, path, definition) do - quote do + quote bind_quoted: [path: path, shape: Macro.escape(definition)] do Phoenix.Router.match( :get, - unquote(path), + path, Phoenix.Sync.Router.Shape, - %{shape: unquote(Macro.escape(definition))}, + %{shape: shape}, alias: false ) end end - defp build_definition(caller, opts) when is_list(opts) do - case Keyword.fetch(opts, :query) do - {:ok, queryable} -> - build_shape_from_query(queryable, caller, opts) - - :error -> - define_shape(caller, opts) - end - end - - defp build_shape_from_query(queryable, caller, opts) do - case Macro.expand_literals(queryable, %{caller | function: {:sync, 4}}) do - schema when is_atom(schema) -> - {storage, _binding} = Code.eval_quoted(opts[:storage], [], caller) - - Phoenix.Sync.PredefinedShape.new!( - schema, - Keyword.merge(opts, storage: storage) - ) - - query when is_tuple(query) -> - raise ArgumentError, - message: - "Router shape configuration only accepts a Ecto.Schema module as a query. For Ecto.Query support please use `Phoenix.Sync.Controller.sync_render/3`" - end - end - - defp define_shape(caller, opts) do - relation = build_relation(opts) - - {storage, _binding} = Code.eval_quoted(opts[:storage], [], caller) - - Phoenix.Sync.PredefinedShape.new!(Keyword.merge(opts, relation), storage: storage) - end - - defp build_relation(opts) do - case Keyword.fetch(opts, :table) do - {:ok, table} -> - [table: table] - - :error -> - raise ArgumentError, message: "Cannot build shape: no :table specified." - end - |> add_namespace(opts) - end - - defp add_namespace(table, opts) do - Keyword.merge(table, Keyword.take(opts, [:namespace])) + defp define_shape(shape, opts, caller) do + Phoenix.Sync.PredefinedShape.new_macro!(shape, opts, caller, + context: env!(caller), + function: {:sync, 3} + ) end defmodule Shape do diff --git a/lib/phoenix/sync/sandbox/producer.ex b/lib/phoenix/sync/sandbox/producer.ex index e8ed45a..a7f3231 100644 --- a/lib/phoenix/sync/sandbox/producer.ex +++ b/lib/phoenix/sync/sandbox/producer.ex @@ -13,7 +13,7 @@ if Phoenix.Sync.sandbox_enabled?() do alias Electric.Replication.LogOffset alias Electric.Replication.ShapeLogCollector - @json if(Code.ensure_loaded?(JSON), do: JSON, else: Jason) + @json Phoenix.Sync.json_library() def child_spec(opts) do {:ok, stack_id} = Keyword.fetch(opts, :stack_id) diff --git a/test/phoenix/sync/controller_test.exs b/test/phoenix/sync/controller_test.exs index 68bc733..e4ae068 100644 --- a/test/phoenix/sync/controller_test.exs +++ b/test/phoenix/sync/controller_test.exs @@ -43,6 +43,10 @@ defmodule Phoenix.Sync.ControllerTest do get "/complex", TodoController, :complex get "/interruptible", TodoController, :interruptible get "/interruptible_dynamic", TodoController, :interruptible_dynamic + get "/transform", TodoController, :transform + get "/transform-capture", TodoController, :transform_capture + get "/transform-interruptible", TodoController, :transform_interruptible + get "/transform-ecto-schema", TodoController, :transform_organization end end @@ -55,6 +59,40 @@ defmodule Phoenix.Sync.ControllerTest do Code.ensure_loaded!(Support.Todo) Code.ensure_loaded!(Support.Repo) + @organizations [ + table: { + "organizations", + [ + "id int8 not null primary key generated always as identity", + "name text", + "external_id uuid", + "address jsonb", + "inserted_at timestamp with time zone", + "updated_at timestamp with time zone" + ] + }, + data: { + "organizations", + ["name", "external_id", "address", "inserted_at", "updated_at"], + [ + [ + "one", + Ecto.UUID.dump!("dfca7ea8-f0d0-47cf-984d-d170f6b989d3"), + %{id: "a8a2cd54-f892-449f-8a1b-1a4a88034bf3", street: "High Street", number: 12}, + ~U[2025-01-01T12:34:14Z], + ~U[2025-01-01T12:34:14Z] + ], + [ + "two", + Ecto.UUID.dump!("017fad69-0603-4dd5-811f-b4f83f45e7af"), + %{id: "82f22f21-8526-4976-84b2-093ab905394d", street: "Market Street", number: 3}, + ~U[2025-01-02T12:34:14Z], + ~U[2025-01-02T12:34:14Z] + ] + ] + } + ] + @moduletag table: { "todos", [ @@ -185,6 +223,125 @@ defmodule Phoenix.Sync.ControllerTest do %{"headers" => %{"operation" => "insert"}, "value" => %{"title" => "two"}} ] = Jason.decode!(resp.resp_body) end + + @tag transform: true + test "accepts a map function as a mfa", _ctx do + resp = + Phoenix.ConnTest.build_conn() + |> Phoenix.ConnTest.get("/todos/transform", %{offset: "-1"}) + + assert resp.status == 200 + assert Plug.Conn.get_resp_header(resp, "electric-offset") == ["0_0"] + + assert [ + %{ + "headers" => %{"operation" => "insert"}, + "value" => %{"title" => "one", "merged" => "mapping-insert-1-one"} + }, + %{ + "headers" => %{"operation" => "insert"}, + "value" => %{"title" => "two", "merged" => "mapping-insert-2-two"} + }, + %{ + "headers" => %{"operation" => "insert"}, + "value" => %{"title" => "three", "merged" => "mapping-insert-3-three"} + } + ] = Jason.decode!(resp.resp_body) + end + + @tag transform: true + test "accepts a map function as a capture", _ctx do + resp = + Phoenix.ConnTest.build_conn() + |> Phoenix.ConnTest.get("/todos/transform-capture", %{offset: "-1"}) + + assert resp.status == 200 + assert Plug.Conn.get_resp_header(resp, "electric-offset") == ["0_0"] + + assert [ + %{ + "headers" => %{"operation" => "insert"}, + "value" => %{"title" => "one", "merged" => "mapping-insert-1-one"} + }, + %{ + "headers" => %{"operation" => "insert"}, + "value" => %{"title" => "two", "merged" => "mapping-insert-2-two"} + }, + %{ + "headers" => %{"operation" => "insert"}, + "value" => %{"title" => "three", "merged" => "mapping-insert-3-three"} + } + ] = Jason.decode!(resp.resp_body) + end + + @tag transform: true + test "works for an interruptible shape", _ctx do + agent = start_supervised!({Agent, fn -> [false] end}) + + Process.register(agent, :interruptible_dynamic_agent) + + resp = + Phoenix.ConnTest.build_conn() + |> Phoenix.ConnTest.get("/todos/transform-interruptible", %{offset: "-1"}) + + assert resp.status == 200 + assert Plug.Conn.get_resp_header(resp, "electric-offset") == ["0_0"] + + assert [ + %{ + "headers" => %{"operation" => "insert"}, + "value" => %{"title" => "one", "merged" => "mapping-insert-1-one"} + }, + %{ + "headers" => %{"operation" => "insert"}, + "value" => %{"title" => "two", "merged" => "mapping-insert-2-two"} + } + ] = Jason.decode!(resp.resp_body) + end + + @tag @organizations + @tag transform: true + test "allows for transforming via an ecto schema", _ctx do + resp = + Phoenix.ConnTest.build_conn() + |> Phoenix.ConnTest.get("/todos/transform-ecto-schema", %{offset: "-1"}) + + assert resp.status == 200 + assert Plug.Conn.get_resp_header(resp, "electric-offset") == ["0_0"] + + assert [ + %{ + "headers" => %{"operation" => "insert"}, + "value" => %{ + "external_id" => "org_37fh5khq2bd47gcn2fypnomj2m", + "name" => "one", + "address" => %{ + "id" => "a8a2cd54-f892-449f-8a1b-1a4a88034bf3", + "number" => 12, + "street" => "High Street" + }, + "id" => 1, + "inserted_at" => "2025-01-01T12:34:14", + "updated_at" => "2025-01-01T12:34:14" + } + }, + %{ + "headers" => %{"operation" => "insert"}, + "value" => %{ + "external_id" => "org_af7222igang5lai7wt4d6rphv4", + "name" => "two", + "address" => %{ + "id" => "82f22f21-8526-4976-84b2-093ab905394d", + "number" => 3, + "street" => "Market Street" + }, + "id" => 2, + "inserted_at" => "2025-01-02T12:34:14", + "updated_at" => "2025-01-02T12:34:14" + } + } + ] = Jason.decode!(resp.resp_body) + end end defmodule PlugRouter do diff --git a/test/phoenix/sync/live_view_test.exs b/test/phoenix/sync/live_view_test.exs index 0c62d83..55e3cb9 100644 --- a/test/phoenix/sync/live_view_test.exs +++ b/test/phoenix/sync/live_view_test.exs @@ -331,7 +331,7 @@ defmodule Phoenix.Sync.LiveViewTest do <:script :let={configuration} phx-no-curly-interpolation> root.render(React.createElement(MyApp, { client_config: <%= configuration %> }, null)) - + """) diff --git a/test/phoenix/sync/predefined_shape_test.exs b/test/phoenix/sync/predefined_shape_test.exs index d14f908..9be02b6 100644 --- a/test/phoenix/sync/predefined_shape_test.exs +++ b/test/phoenix/sync/predefined_shape_test.exs @@ -10,7 +10,7 @@ defmodule Phoenix.Sync.PredefinedShapeTest do schema "cows" do field :name, :string field :age, :integer - field :breed, :string + field :breed, Ecto.Enum, values: [:holstein, :angus, :hereford, :jersey] end def changeset(data \\ %__MODULE__{}, params) do @@ -19,9 +19,7 @@ defmodule Phoenix.Sync.PredefinedShapeTest do data |> cast(params, [:name, :age, :breed]) |> validate_number(:age, greater_than: 0) - |> validate_required([:name]) - |> update_change(:breed, &String.downcase/1) - |> validate_inclusion(:breed, ~w(holstein angus hereford jersey)) + |> validate_required([:name, :breed]) end end @@ -146,5 +144,109 @@ defmodule Phoenix.Sync.PredefinedShapeTest do assert {%{__struct__: ShapeDefinition}, [live: false, errors: :stream]} = PredefinedShape.to_stream_params(ps) end + + @tag :transform + test "transform function is accepted as mfa" do + ps = + PredefinedShape.new!( + Cow, + namespace: "test", + where: "completed = $1", + params: [true], + replica: :full, + columns: ["id", "title"], + transform: {__MODULE__, :map_cow_dupe, []}, + storage: %{compaction: :disabled} + ) + + assert fun = PredefinedShape.transform_fun(ps) + assert is_function(fun, 1) + + assert [:msg, :msg] = fun.(:msg) + end + + @tag :transform + test "transform function is accepted as a capture" do + ps = + PredefinedShape.new!( + Cow, + namespace: "test", + where: "completed = $1", + params: [true], + replica: :full, + columns: ["id", "title"], + transform: &map_cow/1, + storage: %{compaction: :disabled} + ) + + assert fun = PredefinedShape.transform_fun(ps) + assert is_function(fun, 1) + + assert [:msg] = fun.(:msg) + end + + @tag :transform + test "ecto schema modules are accepted as a transform argument" do + ps = + PredefinedShape.new!( + Cow, + namespace: "test", + where: "completed = $1", + params: [true], + replica: :full, + transform: Cow + ) + + assert fun = PredefinedShape.transform_fun(ps) + assert is_function(fun, 1) + + assert [ + %{ + "key" => "key", + "headers" => %{"operation" => "insert"}, + "value" => %Cow{name: "Daisy", age: 12, breed: :jersey} + } + ] = + fun.(%{ + "key" => "key", + "headers" => %{"operation" => "insert"}, + "value" => %{"name" => "Daisy", "age" => 12, "breed" => "jersey"} + }) + end + + @tag :transform + test "transform function is wrapped to return a list" do + ps = + PredefinedShape.new!( + Cow, + namespace: "test", + where: "completed = $1", + params: [true], + replica: :full, + columns: ["id", "title"], + transform: {__MODULE__, :map_cow, []}, + storage: %{compaction: :disabled} + ) + + assert fun = PredefinedShape.transform_fun(ps) + assert is_function(fun, 1) + + assert [:msg] = fun.(:msg) + end + + @tag :transform + test "shape with no transform fun" do + ps = PredefinedShape.new!(table: "cows") + + assert nil == PredefinedShape.transform_fun(ps) + end + + @tag :transform + test "transform_fun accepts and returns nil" do + assert nil == PredefinedShape.transform_fun(nil) + end end + + def map_cow(msg), do: msg + def map_cow_dupe(msg), do: [msg, msg] end diff --git a/test/phoenix/sync/router_test.exs b/test/phoenix/sync/router_test.exs index d10a394..c86a964 100644 --- a/test/phoenix/sync/router_test.exs +++ b/test/phoenix/sync/router_test.exs @@ -67,6 +67,19 @@ defmodule Phoenix.Sync.RouterTest do sync "/query-config2", Support.Todo, replica: :full, storage: %{compaction: :disabled} sync "/typo", Support.Todoo + + # TODO: see if there's a way to pass captures through + # sync "/map/query-capture", Support.Todo, transform: &Phoenix.Sync.RouterTest.map_todo/1 + # sync "/map/keyword-capture", table: "todos", transform: &Phoenix.Sync.RouterTest.map_todo/1 + + sync "/map/query-mfa", Support.Todo, + transform: {Phoenix.Sync.RouterTest, :map_todo, ["query-mfa"]} + + sync "/map/keyword-mfa", + table: "todos", + transform: {Phoenix.Sync.RouterTest, :map_todo, ["keyword-mfa"]} + + sync "/map/ecto-schema", Support.Organization, transform: Support.Organization end scope "/namespaced-sync", WebNamespace do @@ -74,6 +87,15 @@ defmodule Phoenix.Sync.RouterTest do end end + def map_todo( + %{"headers" => %{"operation" => op}} = msg, + route \\ "capture" + ) do + Map.update!(msg, "value", fn value -> + Map.put(value, "merged", "#{route}-#{op}-#{value["id"]}-#{value["title"]}") + end) + end + defmodule Endpoint do use Phoenix.Endpoint, otp_app: :phoenix_sync @@ -81,8 +103,43 @@ defmodule Phoenix.Sync.RouterTest do end Code.ensure_loaded!(Support.Todo) + Code.ensure_loaded!(Support.Organization) Code.ensure_loaded!(Support.Repo) + @organizations [ + table: { + "organizations", + [ + "id int8 not null primary key generated always as identity", + "name text", + "external_id uuid", + "address jsonb", + "inserted_at timestamp with time zone", + "updated_at timestamp with time zone" + ] + }, + data: { + "organizations", + ["name", "external_id", "address", "inserted_at", "updated_at"], + [ + [ + "one", + Ecto.UUID.dump!("dfca7ea8-f0d0-47cf-984d-d170f6b989d3"), + %{id: "a8a2cd54-f892-449f-8a1b-1a4a88034bf3", street: "High Street", number: 12}, + ~U[2025-01-01T12:34:14Z], + ~U[2025-01-01T12:34:14Z] + ], + [ + "two", + Ecto.UUID.dump!("017fad69-0603-4dd5-811f-b4f83f45e7af"), + %{id: "82f22f21-8526-4976-84b2-093ab905394d", street: "Market Street", number: 3}, + ~U[2025-01-02T12:34:14Z], + ~U[2025-01-02T12:34:14Z] + ] + ] + } + ] + setup [ :define_endpoint, :with_stack_id_from_test, @@ -427,6 +484,104 @@ defmodule Phoenix.Sync.RouterTest do |> Phoenix.ConnTest.get("/sync/typo", %{offset: "-1"}) end end + + @tag table: { + "todos", + [ + "id int8 not null primary key generated always as identity", + "title text", + "completed boolean default false" + ] + } + @tag data: { + "todos", + ["title", "completed"], + [["one", false], ["two", false]] + } + + @tag transform: true + test "mapping values" do + resp = + Phoenix.ConnTest.build_conn() + |> Phoenix.ConnTest.get("/sync/map/query-mfa", %{offset: "-1"}) + + assert resp.status == 200 + assert Plug.Conn.get_resp_header(resp, "electric-offset") == ["0_0"] + + assert [ + %{ + "headers" => %{"operation" => "insert"}, + "value" => %{"merged" => "query-mfa-insert-1-one", "title" => "one"} + }, + %{ + "headers" => %{"operation" => "insert"}, + "value" => %{"merged" => "query-mfa-insert-2-two", "title" => "two"} + } + ] = Jason.decode!(resp.resp_body) + end + + @tag transform: true + test "captures in transform are not supported in phoenix.router" do + assert_raise ArgumentError, + ~r/Invalid transform function specification in sync shape definition\. When using Phoenix Router please use an MFA tuple \(`transform: \{Mod, :fun, \[arg1, \.\.\.\]\}`\)/, + fn -> + Code.compile_string(""" + defmodule InvalidCaptureRouter#{System.unique_integer([:positive, :monotonic])} do + use Phoenix.Router + + import Phoenix.Sync.Router + + scope "/sync" do + sync "/map/query-capture", Support.Todo, transform: &Phoenix.Sync.RouterTest.map_todo/1 + end + end + """) + end + end + + @tag @organizations + @tag transform: true + test "transform via ecto schema modules" do + resp = + Phoenix.ConnTest.build_conn() + |> Phoenix.ConnTest.get("/sync/map/ecto-schema", %{offset: "-1"}) + + assert resp.status == 200 + assert Plug.Conn.get_resp_header(resp, "electric-offset") == ["0_0"] + + assert [ + %{ + "headers" => %{"operation" => "insert"}, + "value" => %{ + "external_id" => "org_37fh5khq2bd47gcn2fypnomj2m", + "name" => "one", + "address" => %{ + "id" => "a8a2cd54-f892-449f-8a1b-1a4a88034bf3", + "number" => 12, + "street" => "High Street" + }, + "id" => 1, + "inserted_at" => "2025-01-01T12:34:14", + "updated_at" => "2025-01-01T12:34:14" + } + }, + %{ + "headers" => %{"operation" => "insert"}, + "value" => %{ + "external_id" => "org_af7222igang5lai7wt4d6rphv4", + "name" => "two", + "address" => %{ + "id" => "82f22f21-8526-4976-84b2-093ab905394d", + "number" => 3, + "street" => "Market Street" + }, + "id" => 2, + "inserted_at" => "2025-01-02T12:34:14", + "updated_at" => "2025-01-02T12:34:14" + } + } + ] = Jason.decode!(resp.resp_body) + end end describe "Plug.Router - shape/2" do @@ -478,6 +633,16 @@ defmodule Phoenix.Sync.RouterTest do sync "/shapes/query-module", Support.Todo, where: "completed = false" + sync "/shapes/map-module", Support.Todo, + where: "completed = false", + transform: {Phoenix.Sync.RouterTest, :map_todo, ["module-mfa"]} + + sync "/shapes/map-capture", Support.Todo, + where: "completed = false", + transform: &Phoenix.Sync.RouterTest.map_todo/1 + + sync "/shapes/map-ecto", Support.Organization, transform: Support.Organization + forward "/namespace", to: MyScope match _ do @@ -537,6 +702,120 @@ defmodule Phoenix.Sync.RouterTest do end end + @tag transform: true + test "response mapping", ctx do + resp = + conn(:get, "/shapes/map-module", %{"offset" => "-1"}) + |> MyRouter.call(ctx.plug_opts) + + assert resp.status == 200 + assert Plug.Conn.get_resp_header(resp, "electric-offset") == ["0_0"] + + assert [ + %{ + "headers" => %{"operation" => "insert"}, + "value" => %{"merged" => "module-mfa-insert-1-one", "title" => "one"} + }, + %{ + "headers" => %{"operation" => "insert"}, + "value" => %{"merged" => "module-mfa-insert-2-two", "title" => "two"} + }, + %{ + "headers" => %{"operation" => "insert"}, + "value" => %{"merged" => "module-mfa-insert-3-three", "title" => "three"} + } + ] = Jason.decode!(resp.resp_body) + end + + @tag transform: true + test "response mapping with capture", ctx do + resp = + conn(:get, "/shapes/map-capture", %{"offset" => "-1"}) + |> MyRouter.call(ctx.plug_opts) + + assert resp.status == 200 + assert Plug.Conn.get_resp_header(resp, "electric-offset") == ["0_0"] + + assert [ + %{ + "headers" => %{"operation" => "insert"}, + "value" => %{"merged" => "capture-insert-1-one", "title" => "one"} + }, + %{ + "headers" => %{"operation" => "insert"}, + "value" => %{"merged" => "capture-insert-2-two", "title" => "two"} + }, + %{ + "headers" => %{"operation" => "insert"}, + "value" => %{"merged" => "capture-insert-3-three", "title" => "three"} + } + ] = Jason.decode!(resp.resp_body) + end + + @tag transform: true + test "only &module.fun/1 style captures are accepted" do + assert_raise ArgumentError, + ~r/Invalid transform function specification in sync shape definition/, + fn -> + Code.compile_string(""" + defmodule InvalidCaptureRouter#{System.unique_integer([:positive, :monotonic])} do + use Plug.Router, copy_opts_to_assign: :options + use Phoenix.Sync.Router + + plug :match + plug :dispatch + + sync "/shapes/todos", Support.Todo, + transform: &#{inspect(__MODULE__)}.map_todo(&1, "invalid") + end + """) + end + end + + @tag @organizations + @tag transform: true + test "ecto schema modules are valid transforms", ctx do + resp = + conn(:get, "/shapes/map-ecto", %{"offset" => "-1"}) + |> MyRouter.call(ctx.plug_opts) + + assert resp.status == 200 + assert Plug.Conn.get_resp_header(resp, "electric-offset") == ["0_0"] + + assert [ + %{ + "headers" => %{"operation" => "insert"}, + "value" => %{ + "external_id" => "org_37fh5khq2bd47gcn2fypnomj2m", + "name" => "one", + "address" => %{ + "id" => "a8a2cd54-f892-449f-8a1b-1a4a88034bf3", + "number" => 12, + "street" => "High Street" + }, + "id" => 1, + "inserted_at" => "2025-01-01T12:34:14", + "updated_at" => "2025-01-01T12:34:14" + } + }, + %{ + "headers" => %{"operation" => "insert"}, + "value" => %{ + "external_id" => "org_af7222igang5lai7wt4d6rphv4", + "name" => "two", + "address" => %{ + "id" => "82f22f21-8526-4976-84b2-093ab905394d", + "number" => 3, + "street" => "Market Street" + }, + "id" => 2, + "inserted_at" => "2025-01-02T12:34:14", + "updated_at" => "2025-01-02T12:34:14" + } + } + ] = Jason.decode!(resp.resp_body) + end + test "returns CORS headers", ctx do resp = conn(:get, "/shapes/todos", %{"offset" => "-1"}) diff --git a/test/support/controllers/todo_controller.ex b/test/support/controllers/todo_controller.ex index ee04e11..6f95d6a 100644 --- a/test/support/controllers/todo_controller.ex +++ b/test/support/controllers/todo_controller.ex @@ -47,4 +47,35 @@ defmodule Phoenix.Sync.LiveViewTest.TodoController do ) end) end + + def transform(conn, params) do + sync_render(conn, params, Support.Todo, transform: {__MODULE__, :map_todo, ["mapping"]}) + end + + def transform_capture(conn, params) do + sync_render(conn, params, Support.Todo, transform: &map_todo(&1, "mapping")) + end + + def transform_interruptible(conn, params) do + sync_render(conn, params, fn -> + shape_params = Agent.get(:interruptible_dynamic_agent, & &1) + + Phoenix.Sync.shape!( + table: "todos", + where: "completed = $1", + params: shape_params, + transform: &map_todo(&1, "mapping") + ) + end) + end + + def transform_organization(conn, params) do + sync_render(conn, params, Support.Organization, transform: Support.Organization) + end + + def map_todo(%{"headers" => %{"operation" => op}} = msg, route) do + Map.update!(msg, "value", fn value -> + Map.put(value, "merged", "#{route}-#{op}-#{value["id"]}-#{value["title"]}") + end) + end end diff --git a/test/support/organization.ex b/test/support/organization.ex new file mode 100644 index 0000000..19e0b43 --- /dev/null +++ b/test/support/organization.ex @@ -0,0 +1,25 @@ +defmodule Support.Organization do + use Ecto.Schema + + defmodule Address do + use Ecto.Schema + + @derive Jason.Encoder + + embedded_schema do + field :street, :string + field :number, :integer + end + end + + @derive {Jason.Encoder, except: [:__meta__]} + + schema "organizations" do + field :name, :string + field :external_id, Support.ULID, prefix: "org" + + embeds_one :address, Address + + timestamps() + end +end diff --git a/test/support/ulid.ex b/test/support/ulid.ex new file mode 100644 index 0000000..05274a4 --- /dev/null +++ b/test/support/ulid.ex @@ -0,0 +1,60 @@ +defmodule Support.ULID do + # A dumb parameterized type that stores a UUID with a prefix in the application + # useful for testing the handling of uuid-format columns in e.g. the query + # generator + use Ecto.ParameterizedType + + @impl Ecto.ParameterizedType + def init(opts) do + Map.new(opts) + end + + @impl Ecto.ParameterizedType + def type(_), do: :uuid + + @impl Ecto.ParameterizedType + def cast(value, %{prefix: prefix}) do + if String.starts_with?(value, prefix <> "_") do + {:ok, value} + else + :error + end + end + + @impl Ecto.ParameterizedType + def load(nil, _, _), do: {:ok, nil} + + def load(data, _loader, %{prefix: prefix}) when is_binary(data) and byte_size(data) == 16 do + with {:ok, _uuid} = Ecto.UUID.load(data) do + {:ok, prefix <> "_" <> Base.encode32(data, case: :lower, padding: false)} + end + end + + def load( + <<_::64, ?-, _::32, ?-, _::32, ?-, _::32, ?-, _::96>> = string, + _loader, + %{prefix: prefix} + ) do + with {:ok, bytes} <- Ecto.UUID.dump(string) do + {:ok, prefix <> "_" <> Base.encode32(bytes, case: :lower, padding: false)} + end + end + + @impl Ecto.ParameterizedType + def dump(nil, _, _), do: {:ok, nil} + + def dump(ulid, _, %{prefix: prefix}) do + with [^prefix, uuid] <- String.split(ulid, "_", parts: 2) do + Base.decode32(uuid, case: :lower, padding: false) + else + _ -> :error + end + end + + def dump(_, _, _), do: :error + + @impl Ecto.ParameterizedType + def equal?(nil, nil, _), do: true + def equal?(val, val, _), do: true + def equal?(_, _, _), do: false +end