Skip to content

Commit d2f9b2b

Browse files
authored
Add support for keyword-based sync_stream views (#95)
Fixes #94
1 parent ac40073 commit d2f9b2b

File tree

5 files changed

+216
-9
lines changed

5 files changed

+216
-9
lines changed

lib/phoenix/sync/live_view.ex

Lines changed: 102 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@ if Code.ensure_loaded?(Phoenix.Component) do
1717
{:noreply, sync_stream_update(socket, event)}
1818
end
1919
end
20+
21+
See `sync_stream/4` for more details.
2022
```
2123
"""
2224

2325
use Phoenix.Component
2426

2527
alias Electric.Client.Message
28+
alias Phoenix.Sync.PredefinedShape
2629

2730
require Record
2831

@@ -190,6 +193,40 @@ if Code.ensure_loaded?(Phoenix.Component) do
190193
{:ok, Phoenix.Sync.LiveView.sync_stream(socket, :users, User)}
191194
end
192195
end
196+
197+
## Keyword-based Shapes
198+
199+
`Ecto` is not required to use `sync_stream/4`. [Keyword-based
200+
shapes](../../../README.md#using-a-keyword-list) are possible but
201+
work a little differently.
202+
203+
def mount(_params, _session, socket) do
204+
socket =
205+
Phoenix.Sync.LiveView.sync_stream(
206+
socket,
207+
:admins,
208+
table: "users",
209+
where: "admin = true"
210+
)
211+
{:ok, socket}
212+
end
213+
214+
Without an underlying `Ecto.Schema` module to map the stream values, you
215+
will receive simple map values with string keys (plus a special
216+
`__sync_key__` value used for the live view
217+
[`dom_id`](https://hexdocs.pm/phoenix_live_view/Phoenix.LiveView.html#stream_configure/3)
218+
function.
219+
220+
So to use these in your template, you should be careful to use the
221+
`value["key"]` syntax to retrieve values. Using a keyword-based shape, The
222+
first example above becomes:
223+
224+
<div phx-update="stream">
225+
<div :for={{id, item} <- @streams.items} id={id}>
226+
<%= item["value"] %>
227+
</div>
228+
</div>
229+
193230
"""
194231
@spec sync_stream(
195232
socket :: Phoenix.LiveView.Socket.t(),
@@ -215,8 +252,9 @@ if Code.ensure_loaded?(Phoenix.Component) do
215252
end
216253
end)
217254

218-
Phoenix.LiveView.stream(
219-
socket,
255+
socket
256+
|> configure_live_stream(name, query)
257+
|> Phoenix.LiveView.stream(
220258
name,
221259
client_live_stream(client, name, query, component),
222260
stream_opts
@@ -226,6 +264,36 @@ if Code.ensure_loaded?(Phoenix.Component) do
226264
end
227265
end
228266

267+
defp configure_live_stream(socket, name, query) do
268+
if PredefinedShape.is_queryable?(query) do
269+
# if the shape is an Ecto.Struct then we can just fallback to the default
270+
# dom_id function (which is `value.id`)
271+
socket
272+
else
273+
# since we don't have control over the `mount` callback, but want to use
274+
# `stream_configure` correctly which means that the stream should only be
275+
# configured once, we do our own tracking of configured streams in the
276+
# socket assigns
277+
case socket.assigns do
278+
%{__sync_stream_config__: %{^name => true}} ->
279+
socket
280+
281+
assigns ->
282+
assigns =
283+
Map.update(
284+
assigns,
285+
:__sync_stream_config__,
286+
%{name => true},
287+
&Map.put(&1, name, true)
288+
)
289+
290+
Phoenix.LiveView.stream_configure(%{socket | assigns: assigns}, name,
291+
dom_id: &Map.fetch!(&1, :__sync_key__)
292+
)
293+
end
294+
end
295+
end
296+
229297
@doc """
230298
Handle Electric events within a LiveView.
231299
@@ -277,15 +345,42 @@ if Code.ensure_loaded?(Phoenix.Component) do
277345
defp client_live_stream(client, name, query, component) do
278346
pid = self()
279347

348+
shape =
349+
query
350+
|> PredefinedShape.new!()
351+
|> PredefinedShape.to_shape()
352+
280353
client
281-
|> Electric.Client.stream(query, live: false, replica: :full, errors: :stream)
354+
|> Electric.Client.stream(shape, live: false, replica: :full, errors: :stream)
355+
|> keyed_stream(query)
282356
|> Stream.transform(
283357
fn -> {[], nil} end,
284358
&live_stream_message/2,
285-
&update_mode(&1, {client, name, query, pid, component})
359+
&update_mode(&1, {client, name, query, shape, pid, component})
286360
)
287361
end
288362

363+
# when the stream is based off an Ecto.Schema struct then the default id
364+
# based dom-id function works. otherwise we can use the message key because
365+
# we **know** that is unique for a shape
366+
defp keyed_stream(base_stream, query) do
367+
if PredefinedShape.is_queryable?(query) do
368+
base_stream
369+
else
370+
Stream.map(base_stream, fn
371+
%Message.ChangeMessage{key: key, value: value} = msg ->
372+
%{msg | value: Map.put(value, :__sync_key__, sanitise_key(key))}
373+
374+
msg ->
375+
msg
376+
end)
377+
end
378+
end
379+
380+
defp sanitise_key(key) do
381+
key |> String.replace("\"", "") |> String.replace(" ", "%20")
382+
end
383+
289384
defp live_stream_message(
290385
%Message.ChangeMessage{headers: %{operation: :insert}, value: value},
291386
acc
@@ -313,7 +408,7 @@ if Code.ensure_loaded?(Phoenix.Component) do
313408
raise error
314409
end
315410

316-
defp update_mode({updates, resume}, {client, name, query, pid, component}) do
411+
defp update_mode({updates, resume}, {client, name, query, shape, pid, component}) do
317412
# need to send every update as a separate message.
318413

319414
for event <- updates |> Enum.reverse() |> Enum.map(&wrap_msg(&1, name, component)),
@@ -323,7 +418,8 @@ if Code.ensure_loaded?(Phoenix.Component) do
323418

324419
Task.start_link(fn ->
325420
client
326-
|> Electric.Client.stream(query, resume: resume, replica: :full)
421+
|> Electric.Client.stream(shape, resume: resume, replica: :full)
422+
|> keyed_stream(query)
327423
|> Stream.each(&send_live_event(&1, pid, name, component))
328424
|> Stream.run()
329425
end)

lib/phoenix/sync/predefined_shape.ex

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,11 @@ defmodule Phoenix.Sync.PredefinedShape do
156156
|> ShapeDefinition.params(format: :keyword)
157157
end
158158

159+
@doc false
160+
def to_shape(%__MODULE__{} = predefined_shape) do
161+
to_shape_definition(predefined_shape)
162+
end
163+
159164
@doc false
160165
def to_stream_params(%__MODULE__{} = predefined_shape) do
161166
{to_shape_definition(predefined_shape), predefined_shape.stream_config}

test/phoenix/sync/live_view_test.exs

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ defmodule Phoenix.Sync.LiveViewTest do
3434

3535
Client.Mock.async_response(client,
3636
status: 200,
37-
schema: %{id: %{type: "int8"}, name: %{type: "text"}},
37+
schema: %{id: %{type: "uuid"}, name: %{type: "text"}},
3838
last_offset: Client.Offset.first(),
3939
shape_handle: "users-1",
4040
body: Client.Mock.transaction(users, operation: :insert)
@@ -121,7 +121,7 @@ defmodule Phoenix.Sync.LiveViewTest do
121121

122122
Client.Mock.async_response(client,
123123
status: 200,
124-
schema: %{id: %{type: "int8"}, name: %{type: "text"}},
124+
schema: %{id: %{type: "uuid"}, name: %{type: "text"}},
125125
last_offset: Client.Offset.first(),
126126
shape_handle: "users-1",
127127
body: body
@@ -158,7 +158,7 @@ defmodule Phoenix.Sync.LiveViewTest do
158158

159159
Client.Mock.async_response(client,
160160
status: 200,
161-
schema: %{id: %{type: "int8"}, name: %{type: "text"}},
161+
schema: %{id: %{type: "uuid"}, name: %{type: "text"}},
162162
last_offset: Client.Offset.first(),
163163
shape_handle: "users-1",
164164
body: Client.Mock.transaction(users, operation: :insert)
@@ -224,6 +224,70 @@ defmodule Phoenix.Sync.LiveViewTest do
224224
assert %Electric.Client.Error{message: %{"message" => "this is wrong"}} = error
225225
end
226226
end
227+
228+
defp add_message_keys(txn) do
229+
Enum.map(txn, fn
230+
%{"value" => %{"id" => id}} = msg ->
231+
Map.put(msg, "key", ~s|"public"."users"/"#{id}"|)
232+
233+
msg ->
234+
msg
235+
end)
236+
end
237+
238+
test "allows for keyword-based shapes", %{conn: conn} do
239+
{:ok, client} = Client.Mock.new()
240+
241+
users = [
242+
%{id: "6dfea52e-1096-4b62-aafd-838ddd49477d", name: "User 1"},
243+
%{id: "9fc8f0a7-42e9-4473-9981-43a1904cd88a", name: "User 2"},
244+
%{id: "4252d858-8764-4069-bb8c-e670f899b80a", name: "User 3"}
245+
]
246+
247+
Client.Mock.async_response(client,
248+
status: 200,
249+
schema: %{id: %{type: "uuid", pk_position: 0}, name: %{type: "text"}},
250+
last_offset: Client.Offset.first(),
251+
shape_handle: "users-1",
252+
# TODO: update the mock client to add the key by default
253+
body: Client.Mock.transaction(users, operation: :insert) |> add_message_keys()
254+
)
255+
256+
conn =
257+
conn
258+
|> put_private(:electric_client, client)
259+
|> put_private(:test_pid, self())
260+
261+
{:ok, lv, html} = live(conn, "/stream/keyword")
262+
263+
for %{name: name} <- users do
264+
assert html =~ name
265+
end
266+
267+
users2 = [
268+
%{id: "53183977-bd54-4171-9697-51e13b0ff7ca", name: "User 4"},
269+
%{id: "92d42f40-cf16-4d51-a663-171d9fa1a21a", name: "User 5"},
270+
%{id: "04e15019-010e-4aa1-8eb0-33132099d05b", name: "User 6"}
271+
]
272+
273+
{:ok, _req} =
274+
Client.Mock.response(client,
275+
status: 200,
276+
last_offset: Client.Offset.new(3, 2),
277+
shape_handle: "users-1",
278+
body: Client.Mock.transaction(users2, lsn: 3, operation: :insert) |> add_message_keys()
279+
)
280+
281+
for _ <- users2 do
282+
assert_receive {:sync, _}
283+
end
284+
285+
html = render(lv)
286+
287+
for %{name: name} <- users ++ users2 do
288+
assert html =~ name
289+
end
290+
end
227291
end
228292

229293
describe "electric_client_configuration/1" do

test/support/live_views/stream.ex

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,3 +245,44 @@ defmodule Phoenix.Sync.LiveViewTest.StreamSandbox do
245245
{:noreply, Phoenix.Sync.LiveView.sync_stream_update(socket, event)}
246246
end
247247
end
248+
249+
defmodule Phoenix.Sync.LiveViewTest.StreamLiveKeyword do
250+
use Phoenix.LiveView
251+
252+
def run(lv, func) do
253+
GenServer.call(lv.pid, {:run, func})
254+
end
255+
256+
def render(assigns) do
257+
~H"""
258+
<div id="users" phx-update="stream">
259+
<div :for={{id, user} <- @streams.users} id={id}>
260+
<%= user["name"] %>
261+
</div>
262+
</div>
263+
"""
264+
end
265+
266+
def mount(_params, _session, socket) do
267+
client =
268+
get_in(socket.private.connect_info.private, [:electric_client]) ||
269+
raise "missing client configuration"
270+
271+
parent =
272+
get_in(socket.private.connect_info.private, [:test_pid]) ||
273+
raise "missing parent pid configuration"
274+
275+
{:ok,
276+
socket
277+
|> assign(:test_pid, parent)
278+
|> Phoenix.Sync.LiveView.sync_stream(:users, [table: "users", where: "name != ''"],
279+
client: client
280+
)}
281+
end
282+
283+
def handle_info({:sync, event}, socket) do
284+
# send message to test pid, just for sync
285+
send(socket.assigns.test_pid, {:sync, event})
286+
{:noreply, Phoenix.Sync.LiveView.sync_stream_update(socket, event)}
287+
end
288+
end

test/support/router.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ defmodule Phoenix.Sync.LiveViewTest.Router do
2424
live "/stream", StreamLive
2525
live "/stream/with-component", StreamLiveWithComponent
2626
live "/stream/sandbox", StreamSandbox
27+
live "/stream/keyword", StreamLiveKeyword
2728
end
2829

2930
scope "/" do

0 commit comments

Comments
 (0)