Skip to content

Commit 59ac014

Browse files
authored
fix: A send plug response within the request process (#111)
Bandit (and Electric) require responses to be consumed/sent within the original request process. Also pin the electric version to one that we currently support and fix a bug with the plug version of the sync_render function. Fixes #110
1 parent 859e514 commit 59ac014

File tree

9 files changed

+149
-26
lines changed

9 files changed

+149
-26
lines changed

apps/plug_sync/lib/plug_sync/router.ex

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
defmodule PlugSync.Router do
22
use Plug.Router, copy_opts_to_assign: :options
33
use Phoenix.Sync.Router
4+
use Phoenix.Sync.Controller
45

6+
plug Plug.Logger, log: :debug
57
plug :match
68
plug :dispatch
79

810
sync "/items-mapped", table: "items", transform: &PlugSync.Router.map_item/1
911

12+
get "/items-interruptible" do
13+
sync_render(conn, fn -> [table: "items"] end)
14+
end
15+
1016
match _ do
1117
send_resp(conn, 404, "not found")
1218
end
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
11
defprotocol Phoenix.Sync.Adapter.PlugApi do
22
@moduledoc false
33

4+
@type response() :: term()
5+
46
@spec predefined_shape(t(), Phoenix.Sync.PredefinedShape.t()) :: {:ok, t()} | {:error, term()}
57
def predefined_shape(api, shape)
68

79
@spec call(t(), Plug.Conn.t(), Plug.Conn.params()) :: Plug.Conn.t()
810
def call(api, conn, params)
11+
12+
@spec response(t(), Plug.Conn.t(), Plug.Conn.params()) :: response()
13+
def response(api, conn, params)
14+
15+
@spec send_response(t(), Plug.Conn.t(), response()) :: Plug.Conn.t()
16+
def send_response(api, conn, response)
917
end

lib/phoenix/sync/controller.ex

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,16 @@ defmodule Phoenix.Sync.Controller do
8484
)
8585

8686
def sync_render(conn, shape_fun) when is_function(shape_fun, 0) do
87+
conn = Plug.Conn.fetch_query_params(conn)
88+
8789
conn
8890
|> Phoenix.Sync.Controller.configure_plug_conn!(@plug_assign_opts)
8991
|> Phoenix.Sync.Controller.sync_render(conn.params, shape_fun)
9092
end
9193

9294
def sync_render(conn, shape, shape_opts \\ []) do
95+
conn = Plug.Conn.fetch_query_params(conn)
96+
9397
conn
9498
|> Phoenix.Sync.Controller.configure_plug_conn!(@plug_assign_opts)
9599
|> Phoenix.Sync.Controller.sync_render(conn.params, shape, shape_opts)
@@ -248,7 +252,7 @@ defmodule Phoenix.Sync.Controller do
248252

249253
{:ok, pid} =
250254
Task.start_link(fn ->
251-
send(parent, {:response, self(), Adapter.PlugApi.call(shape_api, conn, params)})
255+
send(parent, {:response, self(), Adapter.PlugApi.response(shape_api, conn, params)})
252256
end)
253257

254258
ref = Process.monitor(pid)
@@ -270,10 +274,10 @@ defmodule Phoenix.Sync.Controller do
270274

271275
interruptible_call(conn, api, params, shape_fun)
272276

273-
{:response, ^pid, conn} ->
277+
{:response, ^pid, response} ->
274278
Process.demonitor(ref, [:flush])
275279

276-
conn
280+
Adapter.PlugApi.send_response(shape_api, conn, response)
277281

278282
{:DOWN, ^ref, :process, _pid, reason} ->
279283
Plug.Conn.send_resp(conn, 500, inspect(reason))

lib/phoenix/sync/electric.ex

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -623,9 +623,25 @@ defmodule Phoenix.Sync.Electric do
623623
def map_response_body(msgs, _mapper) do
624624
msgs
625625
end
626+
627+
if @electric_available? do
628+
@doc false
629+
# for the embedded api we need to make sure that the response stream is consumed
630+
# in the same process that made the request, in order for cleanups to happen
631+
# so we have to enumerate the body stream immediately before passing the response
632+
# to any other process...
633+
def consume_response_stream(%Electric.Shapes.Api.Response{} = response) do
634+
Map.update!(response, :body, &do_consume_stream(&1))
635+
end
636+
637+
defp do_consume_stream(body) do
638+
Enum.to_list(body)
639+
end
640+
end
626641
end
627642

628-
if Code.ensure_loaded?(Electric.Shapes.Api) do
643+
if Code.ensure_loaded?(Electric.Shapes.Api) &&
644+
Code.ensure_loaded?(Phoenix.Sync.Electric.ApiAdapter) do
629645
defimpl Phoenix.Sync.Adapter.PlugApi, for: Electric.Shapes.Api do
630646
alias Electric.Shapes
631647

@@ -672,6 +688,27 @@ if Code.ensure_loaded?(Electric.Shapes.Api) do
672688
Shapes.Api.options(conn)
673689
end
674690

691+
def response(api, _conn, params) do
692+
case Shapes.Api.validate(api, params) do
693+
{:ok, request} ->
694+
{
695+
request,
696+
Shapes.Api.serve_shape_log(request) |> Phoenix.Sync.Electric.consume_response_stream()
697+
}
698+
699+
{:error, response} ->
700+
{nil, response}
701+
end
702+
end
703+
704+
def send_response(%ApiAdapter{}, conn, {request, response}) do
705+
conn
706+
|> content_type()
707+
|> Plug.Conn.assign(:request, request)
708+
|> Plug.Conn.assign(:response, response)
709+
|> Shapes.Api.Response.send(response)
710+
end
711+
675712
defp content_type(conn) do
676713
Plug.Conn.put_resp_content_type(conn, "application/json")
677714
end

lib/phoenix/sync/electric/api_adapter.ex

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,34 @@ if Code.ensure_loaded?(Electric.Shapes.Api) do
5050
Phoenix.Sync.Adapter.PlugApi.call(api, conn, params)
5151
end
5252

53+
# only works if method is GET...
54+
def response(%ApiAdapter{api: api, shape: shape}, %{method: "GET"} = conn, params) do
55+
if transform_fun = PredefinedShape.transform_fun(shape) do
56+
case Shapes.Api.validate(api, params) do
57+
{:ok, request} ->
58+
response = Shapes.Api.serve_shape_log(request)
59+
response = Map.update!(response, :body, &apply_transform(&1, transform_fun))
60+
{request, response}
61+
62+
{:error, response} ->
63+
{nil, response}
64+
end
65+
else
66+
Phoenix.Sync.Adapter.PlugApi.response(api, conn, params)
67+
|> then(fn {request, response} ->
68+
{request, Phoenix.Sync.Electric.consume_response_stream(response)}
69+
end)
70+
end
71+
end
72+
73+
def send_response(%ApiAdapter{}, conn, {request, response}) do
74+
conn
75+
|> content_type()
76+
|> Plug.Conn.assign(:request, request)
77+
|> Plug.Conn.assign(:response, response)
78+
|> Shapes.Api.Response.send(response)
79+
end
80+
5381
defp content_type(conn) do
5482
Plug.Conn.put_resp_content_type(conn, "application/json")
5583
end

lib/phoenix/sync/electric/client_adapter.ex

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,39 +18,63 @@ defmodule Phoenix.Sync.Electric.ClientAdapter do
1818
}}
1919
end
2020

21+
def call(sync_client, conn, params) do
22+
{request, shape} = request(sync_client, conn, params)
23+
24+
fetch_upstream(sync_client, conn, request, shape)
25+
end
26+
27+
def response(sync_client, %{method: "GET"} = conn, params) do
28+
{request, shape} = request(sync_client, conn, params)
29+
30+
make_request(sync_client, conn, request, shape)
31+
end
32+
33+
def send_response(_sync_client, conn, response) do
34+
conn
35+
|> put_resp_headers(response.headers)
36+
|> Plug.Conn.send_resp(response.status, response.body)
37+
end
38+
2139
# this is the server-defined shape route, so we want to only pass on the
2240
# per-request/stream position params leaving the shape-definition params
2341
# from the configured client.
24-
def call(%{shape_definition: %PredefinedShape{} = shape} = sync_client, conn, params) do
25-
request =
42+
defp request(%{shape_definition: %PredefinedShape{} = shape} = sync_client, _conn, params) do
43+
{
2644
Client.request(
2745
sync_client.client,
2846
method: :get,
2947
offset: params["offset"],
3048
shape_handle: params["handle"],
3149
live: live?(params["live"]),
3250
next_cursor: params["cursor"]
33-
)
34-
35-
fetch_upstream(sync_client, conn, request, shape)
51+
),
52+
shape
53+
}
3654
end
3755

3856
# this version is the pure client-defined shape version
39-
def call(sync_client, %{method: method} = conn, params) do
40-
request =
57+
defp request(sync_client, %{method: method} = _conn, params) do
58+
{
4159
Client.request(
4260
sync_client.client,
4361
method: normalise_method(method),
4462
params: params
45-
)
46-
47-
fetch_upstream(sync_client, conn, request, nil)
63+
),
64+
nil
65+
}
4866
end
4967

5068
defp normalise_method(method), do: method |> String.downcase() |> String.to_atom()
5169
defp live?(live), do: live == "true"
5270

5371
defp fetch_upstream(sync_client, conn, request, shape) do
72+
response = make_request(sync_client, conn, request, shape)
73+
74+
send_response(sync_client, conn, response)
75+
end
76+
77+
defp make_request(sync_client, conn, request, shape) do
5478
request = put_req_headers(request, conn.req_headers)
5579

5680
response =
@@ -69,9 +93,7 @@ defmodule Phoenix.Sync.Electric.ClientAdapter do
6993
response.body
7094
end
7195

72-
conn
73-
|> put_resp_headers(response.headers)
74-
|> Plug.Conn.send_resp(response.status, body)
96+
%{response | body: body}
7597
end
7698

7799
defp put_req_headers(request, headers) do

lib/phoenix/sync/sandbox/api_adapter.ex

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,24 @@ if Phoenix.Sync.sandbox_enabled?() do
2626
end)
2727
end
2828

29+
def response(%{shape: nil} = _adapter, conn, params) do
30+
shape_api = lookup_api!()
31+
PlugApi.response(shape_api, conn, params)
32+
end
33+
34+
def response(%{shape: shape} = _adapter, conn, params) do
35+
shape_api = lookup_api!()
36+
37+
Phoenix.Sync.Electric.api_predefined_shape(conn, shape_api, shape, fn conn, shape_api ->
38+
PlugApi.response(shape_api, conn, params)
39+
end)
40+
end
41+
42+
def send_response(_adapter, conn, response) do
43+
shape_api = lookup_api!()
44+
PlugApi.send_response(shape_api, conn, response)
45+
end
46+
2947
defp lookup_api!() do
3048
Phoenix.Sync.Sandbox.retrieve_api!()
3149
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ defmodule Phoenix.Sync.MixProject do
33

44
# Remember to update the README when you change the version
55
@version "0.6.0"
6-
@electric_version "~> 1.1.9 and >= 1.1.9"
6+
@electric_version ">= 1.1.9 and <= 1.1.10"
77

88
def project do
99
[

0 commit comments

Comments
 (0)