Skip to content

Commit 6961e05

Browse files
authored
live-stream: Handle client errors in the live-view process (#37)
An upstream server error now raises directly rather than resulting in a secondary nimbleoptions validation error caused by a missing resume message requires new error handling config in the elixir client: electric-sql/electric#2721
1 parent 8f6aa5d commit 6961e05

File tree

4 files changed

+49
-40
lines changed

4 files changed

+49
-40
lines changed

lib/phoenix/sync/live_view.ex

Lines changed: 18 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -272,57 +272,44 @@ if Code.ensure_loaded?(Phoenix.Component) do
272272
pid = self()
273273

274274
client
275-
|> Electric.Client.stream(query, live: false, replica: :full)
275+
|> Electric.Client.stream(query, live: false, replica: :full, errors: :stream)
276276
|> Stream.transform(
277277
fn -> {[], nil} end,
278-
&live_stream_message(&1, &2, client, name, query, pid, component),
279-
&update_mode(&1, client, name, query, pid, component)
278+
&live_stream_message/2,
279+
&update_mode(&1, {client, name, query, pid, component})
280280
)
281281
end
282282

283283
defp live_stream_message(
284284
%Message.ChangeMessage{headers: %{operation: :insert}, value: value},
285-
acc,
286-
_client,
287-
_name,
288-
_query,
289-
_pid,
290-
_component
285+
acc
291286
) do
292287
{[value], acc}
293288
end
294289

295-
defp live_stream_message(
296-
%Message.ChangeMessage{headers: %{operation: operation}} = msg,
297-
{updates, resume},
298-
_client,
299-
_name,
300-
_query,
301-
_pid,
302-
_component
303-
)
304-
when operation in [:update, :delete] do
290+
defp live_stream_message(%Message.ChangeMessage{} = msg, {updates, resume}) do
305291
{[], {[msg | updates], resume}}
306292
end
307293

308-
defp live_stream_message(
309-
%Message.ResumeMessage{} = resume,
310-
{updates, nil},
311-
_client,
312-
_name,
313-
_query,
314-
_pid,
315-
_component
316-
) do
294+
defp live_stream_message(%Message.ControlMessage{}, acc) do
295+
{[], acc}
296+
end
297+
298+
defp live_stream_message(%Message.ResumeMessage{} = resume, {updates, nil}) do
317299
{[], {updates, resume}}
318300
end
319301

320-
defp live_stream_message(_message, acc, _client, _name, _query, _pid, _component) do
321-
{[], acc}
302+
defp live_stream_message(%Electric.Client.Error{} = error, _acc) do
303+
{[], {error, nil}}
322304
end
323305

324-
defp update_mode({updates, resume}, client, name, query, pid, component) do
306+
defp update_mode({%Electric.Client.Error{} = error, _resume}, _state) do
307+
raise error
308+
end
309+
310+
defp update_mode({updates, resume}, {client, name, query, pid, component}) do
325311
# need to send every update as a separate message.
312+
326313
for event <- updates |> Enum.reverse() |> Enum.map(&wrap_msg(&1, name, component)),
327314
do: send(pid, {:sync, event})
328315

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ defmodule Phoenix.Sync.MixProject do
3939
# require an exact version because electric moves very quickly atm
4040
# and a more generous specification would inevitably break.
4141
{:electric, "== 1.0.1", optional: true},
42-
{:electric_client, "== 0.3.0"}
42+
{:electric_client, ">= 0.5.0-beta-1"}
4343
] ++ deps_for_env(Mix.env())
4444
end
4545

0 commit comments

Comments
 (0)