Skip to content

Commit 02f7958

Browse files
magnetisedthruflo
andauthored
Add support for changeset shapes (#70)
Also clean up the shape definition path, removing Phoenix.Sync from the chain of "things that understand shapes" so now if you change the client's view of a shape it should propagate to Phoenix.Sync without changes --------- Co-authored-by: James Arthur <[email protected]>
1 parent 43b6a8c commit 02f7958

File tree

16 files changed

+1339
-188
lines changed

16 files changed

+1339
-188
lines changed

lib/phoenix/sync.ex

Lines changed: 184 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,39 @@ defmodule Phoenix.Sync do
77

88
alias Electric.Client.ShapeDefinition
99

10+
alias Phoenix.Sync.PredefinedShape
11+
1012
@shape_keys [:namespace, :where, :columns]
1113
@shape_params @shape_keys |> Enum.map(&to_string/1)
1214

13-
@type shape_specification :: [
14-
unquote(NimbleOptions.option_typespec(Phoenix.Sync.PredefinedShape.schema()))
15+
@type shape_options :: [
16+
unquote(NimbleOptions.option_typespec(PredefinedShape.schema()))
1517
]
16-
@type shape_definition ::
17-
String.t()
18-
| Ecto.Queryable.t()
19-
| shape_specification()
18+
19+
if Code.ensure_loaded?(Ecto) do
20+
@type shape_definition ::
21+
String.t()
22+
| Ecto.Queryable.t()
23+
| shape_options()
24+
else
25+
@type shape_definition() :: shape_options()
26+
end
27+
2028
@type param_override ::
2129
{:namespace, String.t()}
2230
| {:table, String.t()}
2331
| {:where, String.t()}
2432
| {:columns, String.t()}
2533
@type param_overrides :: [param_override()]
2634

35+
@type match_shape_params() :: %{
36+
table: String.t(),
37+
namespace: nil | String.t(),
38+
where: nil | String.t(),
39+
params: nil | %{String.t() => String.t()},
40+
columns: nil | [String.t(), ...]
41+
}
42+
2743
@doc """
2844
Returns the required adapter configuration for your Phoenix Endpoint or
2945
`Plug.Router`.
@@ -165,4 +181,166 @@ defmodule Phoenix.Sync do
165181
{:error, "Missing `table` parameter"}
166182
end
167183
end
184+
185+
@doc """
186+
Interrupts all long-polling requests matching the given shape definition.
187+
188+
The broader the shape definition, the more requests will be interrupted.
189+
190+
Returns the number of interrupted requests.
191+
192+
### Examples
193+
194+
To interrupt all shapes on the `todos` table:
195+
196+
Phoenix.Sync.interrupt("todos")
197+
Phoenix.Sync.interrupt(table: "todos")
198+
199+
or the same using an `Ecto.Schema` module:
200+
201+
Phoenix.Sync.interrupt(Todos.Todo)
202+
203+
all shapes with the given parameterized where clause:
204+
205+
Phoenix.Sync.interrupt(table: "todos", where: "user_id = $1")
206+
207+
or a single shape for the given user:
208+
209+
Phoenix.Sync.interrupt(
210+
from(t in Todos.Todo, where: t.user_id == ^user_id)
211+
)
212+
213+
# or
214+
215+
Phoenix.Sync.interrupt(
216+
table: "todos",
217+
where: "user_id = $1",
218+
params: [user_id]
219+
)
220+
221+
# or
222+
223+
Phoenix.Sync.interrupt(
224+
table: "todos",
225+
where: "user_id = '\#{user_id}'"
226+
)
227+
228+
If you want more control over the match, you can pass a function that will
229+
receive a normalized shape definition and should return `true` if the active
230+
shape matches.
231+
232+
Phoenix.Sync.interrupt(fn %{table: _, where: _, params: _} = shape ->
233+
shape.table == "todos" &&
234+
shape.where == "user_id = $1" &&
235+
shape.params["0"] == user_id
236+
end)
237+
238+
The normalized shape argument is a map with the following keys:
239+
240+
- `table`, e.g. `"todos"`
241+
- `namespace`, e.g. `"public"`
242+
- `where`, e.g. `"where user_id = $1"`
243+
- `params`, a map of argument position to argument value, e.g. `%{"0" => "true", "1" => "..."}`
244+
- `columns`, e.g. `["id", "title"]`
245+
246+
All except `table` may be `nil`.
247+
248+
### Interrupting Ecto Query-based Shapes
249+
250+
Be careful when mixing `Ecto` query-based shapes with interrupt calls using
251+
hand-written where clauses.
252+
253+
The shape
254+
255+
Phoenix.Sync.Controller.sync_stream(conn, params, fn ->
256+
from(t in Todos.Todo, where: t.user_id == ^user_id)
257+
end)
258+
259+
will **not** be interrupted by
260+
261+
Phoenix.Sync.interrupt(
262+
table: "todos",
263+
where: "user_id = '\#{user_id}'"
264+
)
265+
266+
because the where clause matching is a simple *exact string* match and `Ecto` query
267+
generated where clauses will generally be different from the equivalent
268+
hand-written version. If you want to interrupt a query-based shape you should
269+
use the same query as the interrupt criteria.
270+
271+
> #### Writing interrupts {: .tip}
272+
>
273+
> It's better to be too broad with your interrupt calls than too narrow.
274+
> Only clients whose shape definition changes after the `interrupt/1` call
275+
> will be affected.
276+
277+
## Supported options
278+
279+
The more options you give the more specific the interrupt call will be. Only
280+
the table name is required.
281+
282+
- `table` - Required. Interrupts all shapes matching the given table. E.g. `"todos"`
283+
- `namespace` - The table namespace. E.g. `"public"`
284+
- `where` - The shape's where clause. Can in be parameterized and will match
285+
all shapes with the same where filter irrespective of the parameters (unless
286+
provided). E.g. `"status = $1"`, `"completed = true"`
287+
- `columns` - The columns included in the shape. E.g. `["id", "title", "completed"]`
288+
- `params` - The values associated with a parameterized where clause. E.g. `[true, 1, "alive"]`, `%{1 => true}`
289+
"""
290+
@spec interrupt(shape_definition() | (match_shape_params() -> boolean()), shape_options()) ::
291+
{:ok, non_neg_integer()}
292+
def interrupt(shape, shape_opts \\ []) do
293+
Phoenix.Sync.ShapeRequestRegistry.interrupt_matching(shape, shape_opts)
294+
end
295+
296+
@doc """
297+
Returns a shape definition for the given params.
298+
299+
## Examples
300+
301+
- An `Ecto.Schema` module:
302+
303+
Phoenix.Sync.shape!(MyPlugApp.Todos.Todo)
304+
305+
- An `Ecto` query:
306+
307+
Phoenix.Sync.shape!(from(t in Todos.Todo, where: t.owner_id == ^user_id))
308+
309+
- A `changeset/1` function which defines the table and columns:
310+
311+
Phoenix.Sync.shape!(&Todos.Todo.changeset/1)
312+
313+
- A `changeset/1` function plus a where clause:
314+
315+
Phoenix.Sync.shape!(
316+
&Todos.Todo.changeset/1,
317+
where: "completed = false"
318+
)
319+
320+
or a parameterized where clause:
321+
322+
Phoenix.Sync.shape!(
323+
&Todos.Todo.changeset/1,
324+
where: "completed = $1", params: [false]
325+
)
326+
327+
- A keyword list defining the shape parameters:
328+
329+
Phoenix.Sync.shape!(
330+
table: "todos",
331+
namespace: "my_app",
332+
where: "completed = $1",
333+
params: [false]
334+
)
335+
336+
## Options
337+
338+
When defining a shape via a keyword list, it supports the following options:
339+
340+
#{NimbleOptions.docs(PredefinedShape.schema())}
341+
"""
342+
@spec shape!(shape_definition(), shape_options()) :: PredefinedShape.t()
343+
def shape!(shape, shape_opts \\ []) do
344+
PredefinedShape.new!(shape, shape_opts)
345+
end
168346
end

lib/phoenix/sync/application.ex

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ defmodule Phoenix.Sync.Application do
99

1010
@impl true
1111
def start(_type, _args) do
12+
base_children = [Phoenix.Sync.ShapeRequestRegistry]
13+
1214
children =
1315
case children() do
1416
{:ok, children} ->
@@ -19,7 +21,10 @@ defmodule Phoenix.Sync.Application do
1921
[]
2022
end
2123

22-
Supervisor.start_link(children, strategy: :one_for_one, name: Phoenix.Sync.Supervisor)
24+
Supervisor.start_link(base_children ++ children,
25+
strategy: :one_for_one,
26+
name: Phoenix.Sync.Supervisor
27+
)
2328
end
2429

2530
@doc false

lib/phoenix/sync/client.ex

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -124,26 +124,16 @@ defmodule Phoenix.Sync.Client do
124124
end
125125

126126
@doc false
127-
def stream(shape, stream_opts, sync_opts) do
128-
client = new!(sync_opts)
129-
{shape, shape_stream_opts} = resolve_shape(shape)
130-
Electric.Client.stream(client, shape, Keyword.merge(shape_stream_opts, stream_opts))
127+
# used for testing. `config` replace the application configuration
128+
def stream(shape, stream_opts, config) do
129+
client = new!(config)
130+
{shape, stream_opts} = resolve_shape(shape, stream_opts)
131+
Electric.Client.stream(client, shape, stream_opts)
131132
end
132133

133-
defp resolve_shape(table) when is_binary(table) do
134-
{table, []}
135-
end
136-
137-
defp resolve_shape(definition) when is_list(definition) do
138-
shape = PredefinedShape.new!(definition)
139-
PredefinedShape.to_stream_params(shape)
140-
end
141-
142-
defp resolve_shape(schema) when is_atom(schema) do
143-
{schema, []}
144-
end
145-
146-
defp resolve_shape(%Ecto.Query{} = query) do
147-
{query, []}
134+
defp resolve_shape(shape, stream_opts) do
135+
shape
136+
|> PredefinedShape.new!(stream_opts)
137+
|> Phoenix.Sync.PredefinedShape.to_stream_params()
148138
end
149139
end

0 commit comments

Comments
 (0)