Skip to content

Commit c5aaa54

Browse files
committed
[wip]
1 parent 828debd commit c5aaa54

File tree

16 files changed

+1048
-123
lines changed

16 files changed

+1048
-123
lines changed

lib/phoenix/sync/application.ex

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ defmodule Phoenix.Sync.Application do
99

1010
@impl true
1111
def start(_type, _args) do
12+
base_children = [Phoenix.Sync.Shape.Supervisor]
13+
1214
children =
1315
case children() do
1416
{:ok, children} ->
@@ -19,7 +21,10 @@ defmodule Phoenix.Sync.Application do
1921
[]
2022
end
2123

22-
Supervisor.start_link(children, strategy: :one_for_one, name: Phoenix.Sync.Supervisor)
24+
Supervisor.start_link(base_children ++ children,
25+
strategy: :one_for_one,
26+
name: Phoenix.Sync.Supervisor
27+
)
2328
end
2429

2530
@doc false

lib/phoenix/sync/client.ex

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,13 @@ defmodule Phoenix.Sync.Client do
106106
columns: ["id", "title"]
107107
)
108108
109+
# consumer a pre-defined shape from a remote server, e.g.
110+
# one defined using `Phoenix.Sync.Controller.sync_render/3` or
111+
# `Phoenix.Sync.Router.sync/2`.
112+
stream = Phoenix.Sync.Client.stream(
113+
"https://myapp.com/sync/todos?user_id=1234"
114+
)
115+
109116
# once you have a stream, consume it as usual
110117
Enum.each(stream, &IO.inspect/1)
111118
@@ -116,18 +123,30 @@ defmodule Phoenix.Sync.Client do
116123
Elixir/Ecto types, rather than raw column data in the form `%{"column_name"
117124
=> "column_value"}`.
118125
"""
126+
# stream(Todo, replica: full, client: client)
127+
# stream("todos", where: "...", replica: full, client: client)
128+
# stream(table: "todos", where: "...", replica: full, client: client)
119129
@spec stream(Phoenix.Sync.shape_definition(), Electric.Client.stream_options()) :: Enum.t()
120130
def stream(shape, stream_opts \\ [])
121131

122-
def stream(shape, stream_opts) do
123-
stream(shape, stream_opts, nil)
132+
def stream(shape, []) when is_list(shape) do
133+
{client, shape} = Keyword.pop_lazy(shape, :client, &new!/0)
134+
135+
{shape, shape_stream_opts} = resolve_shape(shape)
136+
137+
Electric.Client.stream(client, shape, shape_stream_opts)
138+
end
139+
140+
def stream(table, stream_opts) when is_binary(table) and is_list(stream_opts) do
141+
stream(Keyword.put(stream_opts, :table, table), [])
124142
end
125143

126-
@doc false
127-
def stream(shape, stream_opts, sync_opts) do
128-
client = new!(sync_opts)
144+
def stream(shape, stream_opts) when not is_list(shape) and is_list(stream_opts) do
145+
{client, stream_opts} = Keyword.pop_lazy(stream_opts, :client, &new!/0)
146+
129147
{shape, shape_stream_opts} = resolve_shape(shape)
130-
Electric.Client.stream(client, shape, Keyword.merge(shape_stream_opts, stream_opts))
148+
stream_opts = Keyword.merge(shape_stream_opts, stream_opts)
149+
Electric.Client.stream(client, shape, stream_opts)
131150
end
132151

133152
defp resolve_shape(table) when is_binary(table) do

lib/phoenix/sync/predefined_shape.ex

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,13 @@ defmodule Phoenix.Sync.PredefinedShape do
3131
def schema, do: @schema
3232
def keys, do: @keys
3333

34+
def is_queryable?(schema) when is_atom(schema) do
35+
Code.ensure_loaded?(schema) && function_exported?(schema, :__schema__, 1) &&
36+
!is_nil(schema.__schema__(:source))
37+
end
38+
39+
def is_queryable?(q) when is_struct(q, Ecto.Query) or is_struct(q, Ecto.Changeset), do: true
40+
3441
def new!(opts, config \\ [])
3542

3643
def new!(shape, opts) when is_list(opts) and is_list(shape) do

lib/phoenix/sync/sandbox.ex

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,11 @@ if Code.ensure_loaded?(Ecto.Adapters.SQL.Sandbox) do
187187

188188
alias __MODULE__
189189

190+
defmodule Error do
191+
@moduledoc false
192+
defexception [:message]
193+
end
194+
190195
@type start_opts() :: [{:shared, boolean()}]
191196

192197
@registry __MODULE__.Registry
@@ -282,7 +287,7 @@ if Code.ensure_loaded?(Ecto.Adapters.SQL.Sandbox) do
282287

283288
stack_id ->
284289
case GenServer.whereis(__MODULE__.Stack.name(stack_id)) do
285-
nil -> raise RuntimeError, message: "no stack found for #{inspect(stack_id)}"
290+
nil -> raise Error, message: "no stack found for #{inspect(stack_id)}"
286291
_pid -> :ok
287292
end
288293
end
@@ -330,7 +335,8 @@ if Code.ensure_loaded?(Ecto.Adapters.SQL.Sandbox) do
330335

331336
@doc false
332337
def stack_id! do
333-
stack_id() || raise "No stack_id found. Did you call Phoenix.Sync.Sandbox.start!/1?"
338+
stack_id() ||
339+
raise Error, message: "No stack_id found. Did you call Phoenix.Sync.Sandbox.start!/1?"
334340
end
335341

336342
@doc false
@@ -344,14 +350,15 @@ if Code.ensure_loaded?(Ecto.Adapters.SQL.Sandbox) do
344350
# has even started
345351
case GenServer.whereis(Sandbox.StackRegistry) do
346352
nil ->
347-
raise """
348-
Phoenix.Sync.Sandbox is not running. Have you set the mode to `:sandbox` in `config/test.exs`?
353+
raise Error,
354+
message: """
355+
Phoenix.Sync.Sandbox is not running. Have you set the mode to `:sandbox` in `config/test.exs`?
349356
350-
# config/test.exs
351-
config :phoenix_sync,
352-
env: config_env(),
353-
mode: :sandbox
354-
"""
357+
# config/test.exs
358+
config :phoenix_sync,
359+
env: config_env(),
360+
mode: :sandbox
361+
"""
355362

356363
registry_pid when is_pid(registry_pid) ->
357364
pids
@@ -380,16 +387,19 @@ if Code.ensure_loaded?(Ecto.Adapters.SQL.Sandbox) do
380387
|> lookup_stack_id()
381388
|> case do
382389
nil ->
383-
raise RuntimeError,
384-
"No stack_id found for process #{inspect(parent)}. Did you call Phoenix.Sync.Sandbox.start!/1?"
390+
raise Error,
391+
message:
392+
"No stack_id found for process #{inspect(parent)}. Did you call Phoenix.Sync.Sandbox.start!/1?"
385393

386394
stack_id ->
387395
case GenServer.whereis(name_or_pid) do
388396
pid when is_pid(pid) ->
389397
Sandbox.StackRegistry.register(pid, stack_id)
390398

391399
other ->
392-
raise "`allow/4` expects a PID or a locally registered process name but lookup returned: #{inspect(other)}"
400+
raise Error,
401+
message:
402+
"`allow/4` expects a PID or a locally registered process name but lookup returned: #{inspect(other)}"
393403
end
394404
end
395405
end
@@ -424,7 +434,7 @@ if Code.ensure_loaded?(Ecto.Adapters.SQL.Sandbox) do
424434
if stack_id = stack_id() do
425435
Sandbox.StackRegistry.get_client(stack_id)
426436
else
427-
{:error, "No stack_id found. Did you call Phoenix.Sync.Sandbox.start!/1?"}
437+
raise Error, message: "No stack_id found. Did you call Phoenix.Sync.Sandbox.start!/1?"
428438
end
429439
end
430440

@@ -439,6 +449,17 @@ if Code.ensure_loaded?(Ecto.Adapters.SQL.Sandbox) do
439449
client
440450
end
441451

452+
def must_refetch!(shape) do
453+
stack_id = stack_id!()
454+
455+
{%{namespace: namespace, table: table}, _} =
456+
shape
457+
|> Phoenix.Sync.PredefinedShape.new!()
458+
|> Phoenix.Sync.PredefinedShape.to_stream_params()
459+
460+
Sandbox.Producer.truncate(stack_id, {namespace, table})
461+
end
462+
442463
@impl true
443464
@doc false
444465
def init(_) do

lib/phoenix/sync/sandbox/producer.ex

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ if Code.ensure_loaded?(Ecto.Adapters.SQL.Sandbox) do
66
Transaction,
77
NewRecord,
88
UpdatedRecord,
9-
DeletedRecord
9+
DeletedRecord,
10+
TruncatedRelation
1011
}
1112

1213
alias Electric.Replication.LogOffset
@@ -33,6 +34,10 @@ if Code.ensure_loaded?(Ecto.Adapters.SQL.Sandbox) do
3334
GenServer.cast(name(stack_id), {:emit_changes, changes})
3435
end
3536

37+
def truncate(stack_id, relation) do
38+
GenServer.cast(name(stack_id), {:truncate, relation})
39+
end
40+
3641
def name(stack_id) do
3742
Phoenix.Sync.Sandbox.name({__MODULE__, stack_id})
3843
end
@@ -60,6 +65,17 @@ if Code.ensure_loaded?(Ecto.Adapters.SQL.Sandbox) do
6065
{:noreply, %{state | txid: next_txid}}
6166
end
6267

68+
def handle_cast({:truncate, relation}, state) do
69+
changes = [%TruncatedRelation{relation: relation}]
70+
71+
:ok =
72+
state.txid
73+
|> transaction(changes)
74+
|> ShapeLogCollector.store_transaction(ShapeLogCollector.name(state.stack_id))
75+
76+
{:noreply, %{state | txid: state.txid + 100}}
77+
end
78+
6379
defp transaction(txid, changes) do
6480
%Transaction{
6581
xid: txid,

0 commit comments

Comments
 (0)