Skip to content

Commit 36825f5

Browse files
authored
fix: handle stack not ready response by returning to client (#104)
Fixes #101 (along with bump to electric 1.1.9)
1 parent 00d0d13 commit 36825f5

File tree

5 files changed

+65
-47
lines changed

5 files changed

+65
-47
lines changed

lib/phoenix/sync/controller.ex

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -231,57 +231,57 @@ defmodule Phoenix.Sync.Controller do
231231
end
232232

233233
defp sync_render_call(conn, api, params, predefined_shape) do
234-
{:ok, shape_api} = Phoenix.Sync.Adapter.PlugApi.predefined_shape(api, predefined_shape)
235-
236-
Phoenix.Sync.Adapter.PlugApi.call(shape_api, CORS.call(conn), params)
234+
Phoenix.Sync.Electric.api_predefined_shape(conn, api, predefined_shape, fn conn, shape_api ->
235+
Phoenix.Sync.Adapter.PlugApi.call(shape_api, CORS.call(conn), params)
236+
end)
237237
end
238238

239239
defp interruptible_call(conn, api, params, shape_fun) do
240240
predefined_shape = call_shape_fun(shape_fun)
241241

242-
{:ok, shape_api} = Adapter.PlugApi.predefined_shape(api, predefined_shape)
242+
Phoenix.Sync.Electric.api_predefined_shape(conn, api, predefined_shape, fn conn, shape_api ->
243+
{:ok, key} = ShapeRequestRegistry.register_shape(predefined_shape)
243244

244-
{:ok, key} = ShapeRequestRegistry.register_shape(predefined_shape)
245+
try do
246+
parent = self()
247+
start_time = now()
245248

246-
try do
247-
parent = self()
248-
start_time = now()
249-
250-
{:ok, pid} =
251-
Task.start_link(fn ->
252-
send(parent, {:response, self(), Adapter.PlugApi.call(shape_api, conn, params)})
253-
end)
249+
{:ok, pid} =
250+
Task.start_link(fn ->
251+
send(parent, {:response, self(), Adapter.PlugApi.call(shape_api, conn, params)})
252+
end)
254253

255-
ref = Process.monitor(pid)
254+
ref = Process.monitor(pid)
256255

257-
receive do
258-
{:interrupt_shape, ^key, :server_interrupt} ->
259-
Process.demonitor(ref, [:flush])
260-
Process.unlink(pid)
261-
Process.exit(pid, :kill)
262-
# immediately retry the same request -- if the shape_fun returns a
263-
# different shape the client will receive a must-refetch response but
264-
# if the shape is the same then the request will continue with no
265-
# interruption.
266-
#
267-
# if possible adjust the long poll timeout to account for the time
268-
# already spent before the interrupt.
256+
receive do
257+
{:interrupt_shape, ^key, :server_interrupt} ->
258+
Process.demonitor(ref, [:flush])
259+
Process.unlink(pid)
260+
Process.exit(pid, :kill)
261+
# immediately retry the same request -- if the shape_fun returns a
262+
# different shape the client will receive a must-refetch response but
263+
# if the shape is the same then the request will continue with no
264+
# interruption.
265+
#
266+
# if possible adjust the long poll timeout to account for the time
267+
# already spent before the interrupt.
269268

270-
api = reduce_long_poll_timeout(api, start_time)
269+
api = reduce_long_poll_timeout(api, start_time)
271270

272-
interruptible_call(conn, api, params, shape_fun)
271+
interruptible_call(conn, api, params, shape_fun)
273272

274-
{:response, ^pid, conn} ->
275-
Process.demonitor(ref, [:flush])
273+
{:response, ^pid, conn} ->
274+
Process.demonitor(ref, [:flush])
276275

277-
conn
276+
conn
278277

279-
{:DOWN, ^ref, :process, _pid, reason} ->
280-
Plug.Conn.send_resp(conn, 500, inspect(reason))
278+
{:DOWN, ^ref, :process, _pid, reason} ->
279+
Plug.Conn.send_resp(conn, 500, inspect(reason))
280+
end
281+
after
282+
ShapeRequestRegistry.unregister_shape(key)
281283
end
282-
after
283-
ShapeRequestRegistry.unregister_shape(key)
284-
end
284+
end)
285285
end
286286

287287
defp interruptible_call?(params) do

lib/phoenix/sync/electric.ex

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,23 @@ defmodule Phoenix.Sync.Electric do
572572
)
573573
end
574574
end
575+
576+
@doc false
577+
def api_predefined_shape(conn, api, shape, response_fun) when is_function(response_fun, 2) do
578+
case Phoenix.Sync.Adapter.PlugApi.predefined_shape(api, shape) do
579+
{:ok, shape_api} ->
580+
# response_fun should return conn
581+
response_fun.(conn, shape_api)
582+
583+
# Only the embedded api will ever return an error from predefined_shape/2
584+
# when the stack isn't ready (or the params are invalid, e.g. bad table).
585+
# The client adapter just configures the client with the shape
586+
# parameters, which can't error.
587+
{:error, response} ->
588+
conn
589+
|> Plug.Conn.send_resp(response.status, Enum.into(response.body, []))
590+
end
591+
end
575592
end
576593

577594
if Code.ensure_loaded?(Electric.Shapes.Api) do

lib/phoenix/sync/router.ex

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -214,14 +214,14 @@ defmodule Phoenix.Sync.Router do
214214
end
215215

216216
defp serve_shape(conn, api, shape) do
217-
{:ok, shape_api} = Phoenix.Sync.Adapter.PlugApi.predefined_shape(api, shape)
218-
219-
conn =
220-
conn
221-
|> Plug.Conn.fetch_query_params()
222-
|> Phoenix.Sync.Plug.CORS.call()
223-
224-
Phoenix.Sync.Adapter.PlugApi.call(shape_api, conn, conn.params)
217+
Phoenix.Sync.Electric.api_predefined_shape(conn, api, shape, fn conn, shape_api ->
218+
conn =
219+
conn
220+
|> Plug.Conn.fetch_query_params()
221+
|> Phoenix.Sync.Plug.CORS.call()
222+
223+
Phoenix.Sync.Adapter.PlugApi.call(shape_api, conn, conn.params)
224+
end)
225225
end
226226
end
227227
end

lib/phoenix/sync/sandbox/api_adapter.ex

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ if Phoenix.Sync.sandbox_enabled?() do
2020

2121
def call(%{shape: shape} = _adapter, conn, params) do
2222
shape_api = lookup_api!()
23-
{:ok, shape_api} = PlugApi.predefined_shape(shape_api, shape)
2423

25-
PlugApi.call(shape_api, conn, params)
24+
Phoenix.Sync.Electric.api_predefined_shape(conn, shape_api, shape, fn conn, shape_api ->
25+
PlugApi.call(shape_api, conn, params)
26+
end)
2627
end
2728

2829
defp lookup_api!() do

mix.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
"ecto": {:hex, :ecto, "3.13.2", "7d0c0863f3fc8d71d17fc3ad3b9424beae13f02712ad84191a826c7169484f01", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "669d9291370513ff56e7b7e7081b7af3283d02e046cf3d403053c557894a0b3e"},
1616
"ecto_sql": {:hex, :ecto_sql, "3.13.2", "a07d2461d84107b3d037097c822ffdd36ed69d1cf7c0f70e12a3d1decf04e2e1", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.13.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.7", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.19 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "539274ab0ecf1a0078a6a72ef3465629e4d6018a3028095dc90f60a19c371717"},
1717
"electric": {:hex, :electric, "1.1.9", "b0a7774556bf306ffe1b4e4ceb270d0e9dfafdd38044b3ab9f98db8e5ead7bb4", [:mix], [{:backoff, "~> 1.1", [hex: :backoff, repo: "hexpm", optional: false]}, {:bandit, "~> 1.6", [hex: :bandit, repo: "hexpm", optional: false]}, {:dotenvy, "~> 1.1", [hex: :dotenvy, repo: "hexpm", optional: false]}, {:ecto, "~> 3.12", [hex: :ecto, repo: "hexpm", optional: false]}, {:electric_cubdb, "~> 2.0", [hex: :electric_cubdb, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:opentelemetry, "~> 1.5", [hex: :opentelemetry, repo: "hexpm", optional: true]}, {:opentelemetry_exporter, "~> 1.8", [hex: :opentelemetry_exporter, repo: "hexpm", optional: true]}, {:opentelemetry_semantic_conventions, "~> 1.27", [hex: :opentelemetry_semantic_conventions, repo: "hexpm", optional: false]}, {:opentelemetry_telemetry, "~> 1.1", [hex: :opentelemetry_telemetry, repo: "hexpm", optional: false]}, {:otel_metric_exporter, "~> 0.3.11", [hex: :otel_metric_exporter, repo: "hexpm", optional: true]}, {:pg_query_ex, "0.9.0", [hex: :pg_query_ex, repo: "hexpm", optional: false]}, {:plug, "~> 1.17", [hex: :plug, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.20", [hex: :postgrex, repo: "hexpm", optional: false]}, {:remote_ip, "~> 1.2", [hex: :remote_ip, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}, {:retry, "~> 0.19", [hex: :retry, repo: "hexpm", optional: false]}, {:sentry, "~> 11.0", [hex: :sentry, repo: "hexpm", optional: true]}, {:stream_split, "~> 0.1", [hex: :stream_split, repo: "hexpm", optional: false]}, {:telemetry_metrics_prometheus_core, "~> 1.1", [hex: :telemetry_metrics_prometheus_core, repo: "hexpm", optional: true]}, {:telemetry_metrics_statsd, "~> 0.7", [hex: :telemetry_metrics_statsd, repo: "hexpm", optional: true]}, {:telemetry_poller, "~> 1.2", [hex: :telemetry_poller, repo: "hexpm", optional: false]}, {:tls_certificate_check, "~> 1.27", [hex: :tls_certificate_check, repo: "hexpm", optional: false]}, {:tz, "~> 0.28", [hex: :tz, repo: "hexpm", optional: false]}], "hexpm", "cd8beea1d005424fd24aa863e85a707e49b770accd23e66066f6c86524643464"},
18-
"electric_client": {:hex, :electric_client, "0.7.0", "ffe9ba0137e0c8a67f6935adbe42e731bbf1277f06d22cb3ad75c1d18e9647db", [:mix], [{:ecto_sql, "~> 3.12", [hex: :ecto_sql, repo: "hexpm", optional: true]}, {:electric, "~> 1.1.1", [hex: :electric, repo: "hexpm", optional: true]}, {:gen_stage, "~> 1.2", [hex: :gen_stage, repo: "hexpm", optional: true]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "8ad822b150efb28282c4141c57f22008e87c19e066d7ff02eba7e3d276af781d"},
18+
"electric_client": {:hex, :electric_client, "0.7.2", "06f221fa7379d41ab4fb771c9cf78f26654d7c265f61faffa8c31e6b73073224", [:mix], [{:ecto_sql, "~> 3.12", [hex: :ecto_sql, repo: "hexpm", optional: true]}, {:electric, "~> 1.1.1", [hex: :electric, repo: "hexpm", optional: true]}, {:gen_stage, "~> 1.2", [hex: :gen_stage, repo: "hexpm", optional: true]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "4036796cc21767917f1c1c72541b0865a5585b9b4a59ccdb15b99af9c457c97e"},
1919
"electric_cubdb": {:hex, :electric_cubdb, "2.0.2", "36f86e3c52dc26f4e077a49fbef813b1a38d3897421cece851f149190b34c16c", [:mix], [], "hexpm", "0c0e24b31fb76ad1b33c5de2ab35c41a4ff9da153f5c1f9b15e2de78575acaf2"},
2020
"elixir_make": {:hex, :elixir_make, "0.9.0", "6484b3cd8c0cee58f09f05ecaf1a140a8c97670671a6a0e7ab4dc326c3109726", [:mix], [], "hexpm", "db23d4fd8b757462ad02f8aa73431a426fe6671c80b200d9710caf3d1dd0ffdb"},
2121
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},

0 commit comments

Comments
 (0)