|
| 1 | +defmodule Phoenix.Sync.Sandbox.Fetch do |
| 2 | + @moduledoc false |
| 3 | + |
| 4 | + alias Electric.Client |
| 5 | + alias Electric.Client.Fetch |
| 6 | + |
| 7 | + require Logger |
| 8 | + |
| 9 | + @callback request(Client.t(), Fetch.Request.t(), opts :: Keyword.t()) :: |
| 10 | + Fetch.Response.t() | {:error, Fetch.Response.t() | term()} |
| 11 | + |
| 12 | + @behaviour Electric.Client.Fetch.Pool |
| 13 | + |
| 14 | + def name(stack_id) do |
| 15 | + Phoenix.Sync.Sandbox.name({__MODULE__, stack_id}) |
| 16 | + end |
| 17 | + |
| 18 | + @impl Electric.Client.Fetch.Pool |
| 19 | + def request(%Client{} = client, %Fetch.Request{} = request, opts) do |
| 20 | + {:ok, stack_id} = Keyword.fetch(opts, :stack_id) |
| 21 | + |
| 22 | + request_id = request_id(client, request, stack_id) |
| 23 | + |
| 24 | + # The monitor process is unique to the request and launches the actual |
| 25 | + # request as a linked process. |
| 26 | + # |
| 27 | + # This coalesces requests, so no matter how many simultaneous |
| 28 | + # clients we have, we only ever make one request to the backend. |
| 29 | + {:ok, monitor_pid} = start_monitor(stack_id, request_id, request, client) |
| 30 | + |
| 31 | + try do |
| 32 | + ref = Fetch.Monitor.register(monitor_pid, self()) |
| 33 | + |
| 34 | + Fetch.Monitor.wait(ref) |
| 35 | + catch |
| 36 | + :exit, {reason, _} -> |
| 37 | + Logger.debug(fn -> |
| 38 | + "Request process ended with reason #{inspect(reason)} before we could register. Re-attempting." |
| 39 | + end) |
| 40 | + |
| 41 | + request(client, request, opts) |
| 42 | + end |
| 43 | + end |
| 44 | + |
| 45 | + defp start_monitor(stack_id, request_id, request, client) do |
| 46 | + DynamicSupervisor.start_child( |
| 47 | + name(stack_id), |
| 48 | + {Electric.Client.Fetch.Monitor, {request_id, request, client}} |
| 49 | + ) |
| 50 | + |> return_existing() |
| 51 | + end |
| 52 | + |
| 53 | + defp return_existing({:ok, pid}), do: {:ok, pid} |
| 54 | + defp return_existing({:error, {:already_started, pid}}), do: {:ok, pid} |
| 55 | + defp return_existing(error), do: error |
| 56 | + |
| 57 | + defp request_id(%Client{fetch: {fetch_impl, _}}, %Fetch.Request{} = request, stack_id) do |
| 58 | + { |
| 59 | + fetch_impl, |
| 60 | + stack_id, |
| 61 | + URI.to_string(request.endpoint), |
| 62 | + request.headers, |
| 63 | + Fetch.Request.params(request) |
| 64 | + } |
| 65 | + end |
| 66 | +end |
| 67 | + |
0 commit comments