Skip to content

Commit 726a148

Browse files
committed
fix: A send plug response within the request process
Bandit (and Electric) require responses to be consumed/sent within the original request process. Fixes #110
1 parent 06130c6 commit 726a148

File tree

7 files changed

+135
-17
lines changed

7 files changed

+135
-17
lines changed

apps/plug_sync/lib/plug_sync/router.ex

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
defmodule PlugSync.Router do
22
use Plug.Router, copy_opts_to_assign: :options
33
use Phoenix.Sync.Router
4+
use Phoenix.Sync.Controller
45

6+
plug Plug.Logger, log: :debug
57
plug :match
68
plug :dispatch
79

810
sync "/items-mapped", table: "items", transform: &PlugSync.Router.map_item/1
911

12+
get "/items-interruptible" do
13+
sync_render(conn, fn -> [table: "items"] end)
14+
end
15+
1016
match _ do
1117
send_resp(conn, 404, "not found")
1218
end
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
11
defprotocol Phoenix.Sync.Adapter.PlugApi do
22
@moduledoc false
33

4+
@type response() :: term()
5+
46
@spec predefined_shape(t(), Phoenix.Sync.PredefinedShape.t()) :: {:ok, t()} | {:error, term()}
57
def predefined_shape(api, shape)
68

79
@spec call(t(), Plug.Conn.t(), Plug.Conn.params()) :: Plug.Conn.t()
810
def call(api, conn, params)
11+
12+
@spec response(t(), Plug.Conn.t(), Plug.Conn.params()) :: response()
13+
def response(api, conn, params)
14+
15+
@spec send_response(t(), Plug.Conn.t(), response()) :: Plug.Conn.t()
16+
def send_response(api, conn, response)
917
end

lib/phoenix/sync/controller.ex

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ defmodule Phoenix.Sync.Controller do
252252

253253
{:ok, pid} =
254254
Task.start_link(fn ->
255-
send(parent, {:response, self(), Adapter.PlugApi.call(shape_api, conn, params)})
255+
send(parent, {:response, self(), Adapter.PlugApi.response(shape_api, conn, params)})
256256
end)
257257

258258
ref = Process.monitor(pid)
@@ -274,10 +274,10 @@ defmodule Phoenix.Sync.Controller do
274274

275275
interruptible_call(conn, api, params, shape_fun)
276276

277-
{:response, ^pid, conn} ->
277+
{:response, ^pid, response} ->
278278
Process.demonitor(ref, [:flush])
279279

280-
conn
280+
Adapter.PlugApi.send_response(shape_api, conn, response)
281281

282282
{:DOWN, ^ref, :process, _pid, reason} ->
283283
Plug.Conn.send_resp(conn, 500, inspect(reason))

lib/phoenix/sync/electric.ex

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -623,9 +623,24 @@ defmodule Phoenix.Sync.Electric do
623623
def map_response_body(msgs, _mapper) do
624624
msgs
625625
end
626+
627+
if @electric_available? do
628+
@doc false
629+
# for the embedded api we need to make sure that the response stream is consumed
630+
# in the same process that made the request, in order for cleanups to happen
631+
# so we have to enumerate the body stream immediately before passing the response
632+
# to any other process...
633+
def consume_response_stream(%Electric.Shapes.Api.Response{} = response) do
634+
Map.update!(response, :body, &do_consume_stream(&1))
635+
end
636+
637+
defp do_consume_stream(body) do
638+
Enum.reduce(body, [], &[&2, &1])
639+
end
640+
end
626641
end
627642

628-
if Code.ensure_loaded?(Electric.Shapes.Api) do
643+
if Code.ensure_loaded?(Electric.Shapes.Api) && Code.ensure_loaded?(Phoenix.Sync.Electric.ApiAdapter) do
629644
defimpl Phoenix.Sync.Adapter.PlugApi, for: Electric.Shapes.Api do
630645
alias Electric.Shapes
631646

@@ -672,6 +687,27 @@ if Code.ensure_loaded?(Electric.Shapes.Api) do
672687
Shapes.Api.options(conn)
673688
end
674689

690+
def response(api, _conn, params) do
691+
case Shapes.Api.validate(api, params) do
692+
{:ok, request} ->
693+
{
694+
request,
695+
Shapes.Api.serve_shape_log(request) |> Phoenix.Sync.Electric.consume_response_stream()
696+
}
697+
698+
{:error, response} ->
699+
{nil, response}
700+
end
701+
end
702+
703+
def send_response(%ApiAdapter{}, conn, {request, response}) do
704+
conn
705+
|> content_type()
706+
|> Plug.Conn.assign(:request, request)
707+
|> Plug.Conn.assign(:response, response)
708+
|> Shapes.Api.Response.send(response)
709+
end
710+
675711
defp content_type(conn) do
676712
Plug.Conn.put_resp_content_type(conn, "application/json")
677713
end

lib/phoenix/sync/electric/api_adapter.ex

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,34 @@ if Code.ensure_loaded?(Electric.Shapes.Api) do
5050
Phoenix.Sync.Adapter.PlugApi.call(api, conn, params)
5151
end
5252

53+
# only works if method is GET...
54+
def response(%ApiAdapter{api: api, shape: shape}, %{method: "GET"} = conn, params) do
55+
if transform_fun = PredefinedShape.transform_fun(shape) do
56+
case Shapes.Api.validate(api, params) do
57+
{:ok, request} ->
58+
response = Shapes.Api.serve_shape_log(request)
59+
response = Map.update!(response, :body, &apply_transform(&1, transform_fun))
60+
{request, response}
61+
62+
{:error, response} ->
63+
{nil, response}
64+
end
65+
else
66+
Phoenix.Sync.Adapter.PlugApi.response(api, conn, params)
67+
|> then(fn {request, response} ->
68+
{request, Phoenix.Sync.Electric.consume_response_stream(response)}
69+
end)
70+
end
71+
end
72+
73+
def send_response(%ApiAdapter{}, conn, {request, response}) do
74+
conn
75+
|> content_type()
76+
|> Plug.Conn.assign(:request, request)
77+
|> Plug.Conn.assign(:response, response)
78+
|> Shapes.Api.Response.send(response)
79+
end
80+
5381
defp content_type(conn) do
5482
Plug.Conn.put_resp_content_type(conn, "application/json")
5583
end

lib/phoenix/sync/electric/client_adapter.ex

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,39 +18,63 @@ defmodule Phoenix.Sync.Electric.ClientAdapter do
1818
}}
1919
end
2020

21+
def call(sync_client, conn, params) do
22+
{request, shape} = request(sync_client, conn, params)
23+
24+
fetch_upstream(sync_client, conn, request, shape)
25+
end
26+
27+
def response(sync_client, %{method: "GET"} = conn, params) do
28+
{request, shape} = request(sync_client, conn, params)
29+
30+
make_request(sync_client, conn, request, shape)
31+
end
32+
33+
def send_response(_sync_client, conn, response) do
34+
conn
35+
|> put_resp_headers(response.headers)
36+
|> Plug.Conn.send_resp(response.status, response.body)
37+
end
38+
2139
# this is the server-defined shape route, so we want to only pass on the
2240
# per-request/stream position params leaving the shape-definition params
2341
# from the configured client.
24-
def call(%{shape_definition: %PredefinedShape{} = shape} = sync_client, conn, params) do
25-
request =
42+
defp request(%{shape_definition: %PredefinedShape{} = shape} = sync_client, _conn, params) do
43+
{
2644
Client.request(
2745
sync_client.client,
2846
method: :get,
2947
offset: params["offset"],
3048
shape_handle: params["handle"],
3149
live: live?(params["live"]),
3250
next_cursor: params["cursor"]
33-
)
34-
35-
fetch_upstream(sync_client, conn, request, shape)
51+
),
52+
shape
53+
}
3654
end
3755

3856
# this version is the pure client-defined shape version
39-
def call(sync_client, %{method: method} = conn, params) do
40-
request =
57+
defp request(sync_client, %{method: method} = _conn, params) do
58+
{
4159
Client.request(
4260
sync_client.client,
4361
method: normalise_method(method),
4462
params: params
45-
)
46-
47-
fetch_upstream(sync_client, conn, request, nil)
63+
),
64+
nil
65+
}
4866
end
4967

5068
defp normalise_method(method), do: method |> String.downcase() |> String.to_atom()
5169
defp live?(live), do: live == "true"
5270

5371
defp fetch_upstream(sync_client, conn, request, shape) do
72+
response = make_request(sync_client, conn, request, shape)
73+
74+
send_response(sync_client, conn, response)
75+
end
76+
77+
defp make_request(sync_client, conn, request, shape) do
5478
request = put_req_headers(request, conn.req_headers)
5579

5680
response =
@@ -69,9 +93,7 @@ defmodule Phoenix.Sync.Electric.ClientAdapter do
6993
response.body
7094
end
7195

72-
conn
73-
|> put_resp_headers(response.headers)
74-
|> Plug.Conn.send_resp(response.status, body)
96+
%{response | body: body}
7597
end
7698

7799
defp put_req_headers(request, headers) do

lib/phoenix/sync/sandbox/api_adapter.ex

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,24 @@ if Phoenix.Sync.sandbox_enabled?() do
2626
end)
2727
end
2828

29+
def response(%{shape: nil} = _adapter, conn, params) do
30+
shape_api = lookup_api!()
31+
PlugApi.response(shape_api, conn, params)
32+
end
33+
34+
def response(%{shape: shape} = _adapter, conn, params) do
35+
shape_api = lookup_api!()
36+
37+
Phoenix.Sync.Electric.api_predefined_shape(conn, shape_api, shape, fn conn, shape_api ->
38+
PlugApi.response(shape_api, conn, params)
39+
end)
40+
end
41+
42+
def send_response(_adapter, conn, response) do
43+
shape_api = lookup_api!()
44+
PlugApi.send_response(shape_api, conn, response)
45+
end
46+
2947
defp lookup_api!() do
3048
Phoenix.Sync.Sandbox.retrieve_api!()
3149
end

0 commit comments

Comments
 (0)