Skip to content

Commit 7a14c26

Browse files
committed
add a new transaction/5 function for low-level usage
also expose `parse_transaction/2`
1 parent 0d03d37 commit 7a14c26

File tree

2 files changed

+213
-8
lines changed

2 files changed

+213
-8
lines changed

lib/phoenix/sync/writer.ex

Lines changed: 133 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1116,8 +1116,7 @@ defmodule Phoenix.Sync.Writer do
11161116
# operations before doing anything. So i can just add an error to a blank
11171117
# multi and return that and the transaction step will fail before touching
11181118
# the repo.
1119-
with {:parse, {:ok, %Transaction{} = txn}} <- {:parse, parse_transaction(writer, changes)},
1120-
{:check, :ok} <- {:check, check_transaction(writer, txn)} do
1119+
with {:ok, %Transaction{} = txn} <- parse_check(writer, changes) do
11211120
txn.operations
11221121
|> Enum.reduce(
11231122
start_multi(txn),
@@ -1129,7 +1128,34 @@ defmodule Phoenix.Sync.Writer do
11291128
end
11301129
end
11311130

1132-
defp parse_transaction(%__MODULE__{} = writer, changes) do
1131+
defp parse_check(%__MODULE__{} = writer, changes) do
1132+
with {:parse, {:ok, %Transaction{} = txn}} <- {:parse, parse_transaction(writer, changes)},
1133+
{:check, :ok} <- {:check, check_transaction(writer, txn)} do
1134+
{:ok, txn}
1135+
end
1136+
end
1137+
1138+
@doc """
1139+
Use the parser configured in the given [`Writer`](`#{inspect(__MODULE__)}`)
1140+
instance to decode the given transaction data.
1141+
1142+
This can be used to handle mutation operations explicitly:
1143+
1144+
{:ok, txn} = #{inspect(__MODULE__)}.parse_transaction(writer, my_json_tx_data)
1145+
1146+
{:ok, txid} =
1147+
Repo.transaction(fn ->
1148+
Enum.each(txn.operations, fn operation ->
1149+
# do something wih the given operation
1150+
# raise if something is wrong...
1151+
end)
1152+
# return the transaction id
1153+
#{inspect(__MODULE__)}.txid!(Repo)
1154+
end)
1155+
"""
1156+
@spec parse_transaction(t(), Format.transaction_data()) ::
1157+
{:ok, Transaction.t()} | {:error, term()}
1158+
def parse_transaction(%__MODULE__{} = writer, changes) do
11331159
case writer.parser do
11341160
fun when is_function(fun, 1) ->
11351161
fun.(changes)
@@ -1529,7 +1555,81 @@ defmodule Phoenix.Sync.Writer do
15291555
end
15301556

15311557
@doc """
1532-
Extract the transaction id from changes returned from `Repo.transaction`.
1558+
Apply operations from a mutation transaction directly via a transaction.
1559+
1560+
`operation_fun` is a 1-arity function that receives each of the
1561+
`%#{inspect(__MODULE__.Operation)}{}` structs within the mutation data and
1562+
should apply them appropriately.
1563+
1564+
The `operation_fun` callback is done within a `c:Ecto.Repo.transaction/2` so
1565+
any exceptions will cause the entire transaction to be aborted.
1566+
1567+
This function will also raise if the transaction data fails to parse.
1568+
1569+
{:ok, txid} =
1570+
#{inspect(__MODULE__)}.new(format: #{inspect(__MODULE__.Format.TanstackOptimistic)})
1571+
|> #{inspect(__MODULE__)}.transaction(
1572+
my_encoded_txn,
1573+
MyApp.Repo,
1574+
fn
1575+
%{operation: :insert, relation: [_, "todos"], change: change} ->
1576+
# insert a Todo
1577+
%{operation: :update, relation: [_, "todos"], data: data, change: change} ->
1578+
# update a Todo
1579+
%{operation: :delete, relation: [_, "todos"], data: data} ->
1580+
# delete a Todo
1581+
end
1582+
)
1583+
1584+
The `opts` are passed onto the `c:Ecto.Repo.transaction/2` call.
1585+
1586+
This is equivalent to the below (if using the custom
1587+
`parse_transaction_data/1` parser function):
1588+
1589+
{:ok, txn} =
1590+
#{inspect(__MODULE__.Format.TanstackOptimistic)}.parse_transaction(my_encoded_txn)
1591+
1592+
{:ok, txid} =
1593+
MyApp.Repo.transaction(fn ->
1594+
Enum.each(txn.operations, fn
1595+
%{operation: :insert, relation: [_, "todos"], change: change} ->
1596+
# insert a Todo
1597+
%{operation: :update, relation: [_, "todos"], data: data, change: change} ->
1598+
# update a Todo
1599+
%{operation: :delete, relation: [_, "todos"], data: data} ->
1600+
# delete a Todo
1601+
end)
1602+
#{inspect(__MODULE__)}.txid!(MyApp.Repo)
1603+
end)
1604+
"""
1605+
@spec transaction(
1606+
t(),
1607+
Format.transaction_data(),
1608+
Ecto.Repo.t(),
1609+
operation_fun :: (Operation.t() -> any()),
1610+
keyword()
1611+
) ::
1612+
{:ok, txid()} | {:error, any()}
1613+
def transaction(%__MODULE__{} = writer, changes, repo, operation_fun, opts \\ [])
1614+
when is_function(operation_fun, 1) and is_atom(repo) do
1615+
case parse_transaction(writer, changes) do
1616+
{:ok, %Transaction{} = txn} ->
1617+
repo.transaction(
1618+
fn ->
1619+
Enum.each(txn.operations, operation_fun)
1620+
txid!(repo)
1621+
end,
1622+
opts
1623+
)
1624+
1625+
{:error, reason} ->
1626+
raise Error, message: reason
1627+
end
1628+
end
1629+
1630+
@doc """
1631+
Extract the transaction id from changes or from a `Ecto.Repo` within a
1632+
transaction.
15331633
15341634
This allows you to use a standard `c:Ecto.Repo.transaction/2` call to apply
15351635
mutations defined using `apply/2` and extract the transaction id afterwards.
@@ -1544,20 +1644,45 @@ defmodule Phoenix.Sync.Writer do
15441644
|> MyApp.Repo.transaction()
15451645
15461646
{:ok, txid} = Phoenix.Sync.Writer.txid(changes)
1647+
1648+
It also allows you to get a transaction id from any active transaction:
1649+
1650+
MyApp.Repo.transaction(fn ->
1651+
{:ok, txid} = #{inspect(__MODULE__)}.txid(MyApp.Repo)
1652+
end)
1653+
1654+
Attempting to run `txid/1` on a repo outside a transaction will return an
1655+
error.
15471656
"""
15481657
@spec txid(Ecto.Multi.changes()) :: {:ok, txid()} | :error
15491658
def txid(%{@txid_name => txid} = _changes), do: {:ok, txid}
1550-
def txid(_), do: :error
1659+
def txid(changes) when is_map(changes), do: :error
1660+
1661+
def txid(repo) when is_atom(repo) do
1662+
if repo.in_transaction?() do
1663+
with {:ok, %{rows: [[txid]]}} = repo.query(@txid_query) do
1664+
{:ok, txid}
1665+
end
1666+
else
1667+
{:error, %Error{message: "not in a transaction"}}
1668+
end
1669+
end
15511670

15521671
@doc """
1553-
Returns the transaction id from a `Ecto.Multi.changes()` result or raises if
1554-
not found.
1672+
Returns the a transaction id or raises on an error.
15551673
15561674
See `txid/1`.
15571675
"""
15581676
@spec txid!(Ecto.Multi.changes()) :: txid()
15591677
def txid!(%{@txid_name => txid} = _changes), do: txid
1560-
def txid!(_), do: raise(ArgumentError, message: "No txid in change data")
1678+
def txid!(%{}), do: raise(ArgumentError, message: "No txid in change data")
1679+
1680+
def txid!(repo) when is_atom(repo) do
1681+
case txid(repo) do
1682+
{:ok, txid} -> txid
1683+
{:error, reason} -> raise reason
1684+
end
1685+
end
15611686

15621687
@doc """
15631688
Return a unique operation name for use in `pre_apply` or `post_apply` callbacks.

test/phoenix/sync/writer_test.exs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -959,6 +959,86 @@ defmodule Phoenix.Sync.WriterTest do
959959

960960
assert is_integer(txid)
961961
end
962+
963+
test "supports any unparsed mutation data" do
964+
changes = [
965+
%{
966+
"type" => "insert",
967+
"syncMetadata" => %{"relation" => ["public", "todos_local"]},
968+
"modified" => %{"id" => "98", "title" => "New todo", "completed" => "false"}
969+
},
970+
%{
971+
"type" => "insert",
972+
"syncMetadata" => %{"relation" => ["public", "todos_local"]},
973+
"modified" => %{"id" => "99", "title" => "Disposable todo", "completed" => "false"}
974+
},
975+
%{
976+
"type" => "delete",
977+
"syncMetadata" => %{"relation" => ["public", "todos_local"]},
978+
"original" => %{"id" => "2"}
979+
},
980+
%{
981+
"type" => "update",
982+
"syncMetadata" => %{"relation" => ["public", "todos_local"]},
983+
"original" => %{"id" => "1", "title" => "First todo", "completed" => "false"},
984+
"changes" => %{"title" => "Changed title"}
985+
},
986+
%{
987+
"type" => "update",
988+
"syncMetadata" => %{"relation" => ["public", "todos_local"]},
989+
"original" => %{"id" => "1", "title" => "Changed title", "completed" => "false"},
990+
"changes" => %{"completed" => "true"}
991+
},
992+
%{
993+
"type" => "delete",
994+
"syncMetadata" => %{"relation" => ["public", "todos_local"]},
995+
"original" => %{"id" => "99", "title" => "New todo", "completed" => "false"}
996+
},
997+
%{
998+
"type" => "update",
999+
"syncMetadata" => %{"relation" => ["public", "todos_local"]},
1000+
"original" => %{"id" => "98", "title" => "Working todo", "completed" => "false"},
1001+
"changes" => %{"title" => "Working todo", "completed" => "true"}
1002+
}
1003+
]
1004+
1005+
parent = self()
1006+
1007+
assert {:ok, txid} =
1008+
writer()
1009+
|> Writer.transaction(
1010+
changes,
1011+
Repo,
1012+
fn
1013+
%Writer.Operation{
1014+
operation: op,
1015+
relation: relation,
1016+
data: data,
1017+
changes: changes
1018+
} ->
1019+
send(parent, {op, relation, data, changes})
1020+
end
1021+
)
1022+
1023+
assert is_integer(txid)
1024+
assert_receive {:insert, ["public", "todos_local"], %{}, %{"id" => "98"}}
1025+
assert_receive {:insert, ["public", "todos_local"], %{}, %{"id" => "99"}}
1026+
assert_receive {:delete, ["public", "todos_local"], %{"id" => "2"}, %{}}
1027+
assert_receive {:update, ["public", "todos_local"], %{"id" => "1"}, %{"title" => _}}
1028+
assert_receive {:update, ["public", "todos_local"], %{"id" => "1"}, %{"completed" => _}}
1029+
assert_receive {:delete, ["public", "todos_local"], %{"id" => "99"}, %{}}
1030+
1031+
assert_receive {:update, ["public", "todos_local"], %{"id" => "98"},
1032+
%{"title" => _, "completed" => _}}
1033+
end
1034+
1035+
test "parse errors return an error tuple" do
1036+
writer = Writer.new(parser: fn _ -> {:error, "no"} end)
1037+
1038+
assert_raise Writer.Error, fn ->
1039+
Writer.transaction(writer, [], Repo, fn _ -> nil end)
1040+
end
1041+
end
9621042
end
9631043

9641044
def parse_transaction(m) when is_list(m) do

0 commit comments

Comments
 (0)