Skip to content

Commit 607f9bf

Browse files
authored
refactor transaction (#128)
1 parent 401cede commit 607f9bf

17 files changed

+634
-328
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111
* add telemetry support for execution
1212
* add new Repo module (thanks to daskycodes)
1313
* add missing typespecs (thanks to fdie)
14+
* refactor transaction api to support nested transaction
1415

1516
## 0.8.4 (2022-03-09)
1617
* Bugfix

README.md

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -634,14 +634,38 @@ For more information see:
634634
Since MongoDB 4.x, transactions for multiple write operations are possible. The [Mongo.Session](https://hexdocs.pm/mongodb_driver/Mongo.Session.html#content) is responsible for the details and you can use a convenient api for transactions:
635635

636636
```elixir
637-
alias Mongo.Session
638637

639-
{:ok, ids} = Session.with_transaction(top, fn opts ->
640-
{:ok, %InsertOneResult{:inserted_id => id1}} = Mongo.insert_one(top, "dogs", %{name: "Greta"}, opts)
641-
{:ok, %InsertOneResult{:inserted_id => id2}} = Mongo.insert_one(top, "dogs", %{name: "Waldo"}, opts)
642-
{:ok, %InsertOneResult{:inserted_id => id3}} = Mongo.insert_one(top, "dogs", %{name: "Tom"}, opts)
638+
{:ok, ids} = Mongo.transaction(top, fn ->
639+
{:ok, %InsertOneResult{:inserted_id => id1}} = Mongo.insert_one(top, "dogs", %{name: "Greta"})
640+
{:ok, %InsertOneResult{:inserted_id => id2}} = Mongo.insert_one(top, "dogs", %{name: "Waldo"})
641+
{:ok, %InsertOneResult{:inserted_id => id3}} = Mongo.insert_one(top, "dogs", %{name: "Tom"})
643642
{:ok, [id1, id2, id3]}
644643
end, w: 1)
644+
645+
```
646+
The transaction/3 function supports nesting. This allows the functions to be called from each other and all write operations
647+
are still in the same transaction. The session is stored in the process dictionary under the key `:session`. The surrounding
648+
transaction/3 call creates the session and starts the transaction, storing the session in the process dictionary, commits or
649+
aborts the transaction. All other transaction/3 calls just call the function parameter without other actions.
650+
651+
```elixir
652+
def insert_dog(top, name) do
653+
Mongo.insert_one(top, "dogs", %{name: name})
654+
end
655+
656+
def insert_dogs(top) do
657+
Mongo.transaction(top, fn ->
658+
insert_dog(top, "Tom")
659+
insert_dog(top, "Bell")
660+
insert_dog(top, "Fass")
661+
:ok
662+
end)
663+
end
664+
665+
:ok = Mongo.transaction(top, fn ->
666+
insert_dog(top, "Greta")
667+
insert_dogs(top)
668+
end)
645669
```
646670

647671
It is also possible to get more control over the progress of the transaction:
@@ -660,11 +684,7 @@ Mongo.insert_one(top, "dogs", %{name: "Tom"}, session: session)
660684
:ok = Session.end_session(top, session)
661685
```
662686

663-
For more information see:
664-
665-
- [Mongo.Session](https://hexdocs.pm/mongodb_driver/Mongo.Session.html#content)
666-
667-
and have a look at the test units as well.
687+
For more information see [Mongo.Session](https://hexdocs.pm/mongodb_driver/Mongo.Session.html#content) and have a look at the test units as well.
668688

669689
### Command Monitoring
670690

@@ -713,12 +733,6 @@ $ mongod --sslMode allowSSL --sslPEMKeyFile /path/to/mongodb.pem
713733
- For `--sslMode` you can use one of `allowSSL` or `preferSSL`
714734
- You can enable any other options you want when starting `mongod`
715735

716-
## More Examples
717-
718-
There are some basic examples in the `example` folder. But if you want to see the driver in action
719-
take a look at [Vega](https://github.com/zookzook/vega), especially the [Board.ex](https://github.com/zookzook/vega/blob/master/lib/vega/board.ex) module for using the transaction api together with
720-
bulk operations.
721-
722736
## Special Thanks
723737

724738
Special thanks to [JetBrains](https://www.jetbrains.com/?from=elixir-mongodb-driver) for providing a free JetBrains Open Source license for their complete toolbox.

lib/mongo.ex

Lines changed: 185 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ defmodule Mongo do
5757
use Bitwise
5858
use Mongo.Messages
5959

60+
import Mongo.Session, only: [in_read_session: 3, in_write_session: 3]
6061
alias Mongo.Query
6162
alias Mongo.Topology
6263
alias Mongo.UrlParser
@@ -67,6 +68,7 @@ defmodule Mongo do
6768
alias Mongo.Error
6869

6970
@timeout 15_000
71+
@retry_timeout_seconds 120
7072

7173
@type conn :: DbConnection.Conn
7274
@type collection :: String.t()
@@ -239,6 +241,153 @@ defmodule Mongo do
239241
<<u0::48, @uuid_v4::4, u1::12, @variant10::2, u2::62>>
240242
end
241243

244+
@doc """
245+
Convenient function for running multiple write commands in a transaction.
246+
247+
In case of `TransientTransactionError` or `UnknownTransactionCommitResult` the function will retry the whole transaction or
248+
the commit of the transaction. You can specify a timeout (`:transaction_retry_timeout_s`) to limit the time of repeating.
249+
The default value is 120 seconds. If you don't wait so long, you call `with_transaction` with the
250+
option `transaction_retry_timeout_s: 10`. In this case after 10 seconds of retrying, the function will return
251+
an error.
252+
253+
## Example
254+
255+
{:ok, ids} = Mongo.transaction(top, fn ->
256+
{:ok, %InsertOneResult{:inserted_id => id1}} = Mongo.insert_one(top, "dogs", %{name: "Greta"})
257+
{:ok, %InsertOneResult{:inserted_id => id2}} = Mongo.insert_one(top, "dogs", %{name: "Waldo"})
258+
{:ok, %InsertOneResult{:inserted_id => id3}} = Mongo.insert_one(top, "dogs", %{name: "Tom"})
259+
{:ok, [id1, id2, id3]}
260+
end, transaction_retry_timeout_s: 10)
261+
262+
If transaction/3 is called inside another transaction, the function is simply executed, without wrapping the new transaction call in any way.
263+
If there is an error in the inner transaction and the error is rescued, or the inner transaction is aborted (abort_transaction/1),
264+
the whole outer transaction is aborted, guaranteeing nothing will be committed.
265+
"""
266+
@spec transaction(GenServer.server(), function) :: {:ok, any()} | :error | {:error, term}
267+
def transaction(topology_pid, fun, opts \\ []) do
268+
:session
269+
|> Process.get()
270+
|> do_transaction(topology_pid, fun, opts)
271+
end
272+
273+
defp do_transaction(nil, topology_pid, fun, opts) do
274+
## try catch
275+
with {:ok, session} <- Session.start_session(topology_pid, :write, opts) do
276+
Process.put(:session, session)
277+
278+
try do
279+
run_in_transaction(topology_pid, session, fun, DateTime.utc_now(), opts)
280+
rescue
281+
error ->
282+
{:error, error}
283+
after
284+
Session.end_session(topology_pid, session)
285+
Process.delete(:session)
286+
end
287+
end
288+
end
289+
290+
defp do_transaction(_session, _topology_pid, fun, _opts) when is_function(fun, 0) do
291+
fun.()
292+
end
293+
294+
defp do_transaction(_session, _topology_pid, fun, opts) when is_function(fun, 1) do
295+
fun.(opts)
296+
end
297+
298+
defp run_in_transaction(topology_pid, session, fun, start_time, opts) do
299+
Session.start_transaction(session)
300+
301+
case run_function(fun, Keyword.merge(opts, session: session)) do
302+
:ok ->
303+
handle_commit(session, start_time)
304+
305+
{:ok, result} ->
306+
handle_commit(session, start_time, result)
307+
308+
{:error, error} ->
309+
## check in case of an error while processing transaction
310+
Session.abort_transaction(session)
311+
timeout = opts[:transaction_retry_timeout_s] || @retry_timeout_seconds
312+
313+
case Error.has_label(error, "TransientTransactionError") && DateTime.diff(DateTime.utc_now(), start_time, :second) < timeout do
314+
true ->
315+
run_in_transaction(topology_pid, session, fun, start_time, opts)
316+
317+
false ->
318+
{:error, error}
319+
end
320+
321+
:error ->
322+
Session.abort_transaction(session)
323+
:error
324+
325+
other ->
326+
## everything else is an error
327+
Session.abort_transaction(session)
328+
{:error, other}
329+
end
330+
end
331+
332+
##
333+
# calling the function and wrapping it to catch exceptions
334+
#
335+
defp run_function(fun, _opts) when is_function(fun, 0) do
336+
try do
337+
fun.()
338+
rescue
339+
reason -> {:error, reason}
340+
end
341+
end
342+
343+
defp run_function(fun, opts) when is_function(fun, 1) do
344+
try do
345+
fun.(opts)
346+
rescue
347+
reason -> {:error, reason}
348+
end
349+
end
350+
351+
defp handle_commit(session, start_time) do
352+
case Session.commit_transaction(session, start_time) do
353+
## everything is okay
354+
:ok ->
355+
:ok
356+
357+
error ->
358+
## the rest is an error
359+
Session.abort_transaction(session)
360+
error
361+
end
362+
end
363+
364+
defp handle_commit(session, start_time, result) do
365+
case Session.commit_transaction(session, start_time) do
366+
## everything is okay
367+
:ok ->
368+
{:ok, result}
369+
370+
error ->
371+
## the rest is an error
372+
Session.abort_transaction(session)
373+
error
374+
end
375+
end
376+
377+
def abort_transaction(reason) do
378+
:session
379+
|> Process.get()
380+
|> abort_transaction(reason)
381+
end
382+
383+
def abort_transaction(nil, reason) do
384+
raise Mongo.Error.exception("Aborting transaction (#{inspect(reason)}) is not allowed, because there is no active transaction!")
385+
end
386+
387+
def abort_transaction(_session, reason) do
388+
raise Mongo.Error.exception("Aborting transaction, reason #{inspect(reason)}")
389+
end
390+
242391
@doc """
243392
Creates a change stream cursor on collections.
244393
@@ -427,55 +576,45 @@ defmodule Mongo do
427576
## check, if retryable reads are enabled
428577
opts = Mongo.retryable_reads(opts)
429578

430-
with {:ok, session} <- Session.start_implicit_session(topology_pid, :read, opts),
431-
result <- exec_command_session(session, cmd, opts),
432-
:ok <- Session.end_implict_session(topology_pid, session) do
433-
case result do
434-
{:error, error} ->
435-
cond do
436-
Error.not_writable_primary_or_recovering?(error, opts) ->
437-
## in case of explicitly
438-
issue_command(topology_pid, cmd, :read, Keyword.put(opts, :retry_counter, 2))
579+
case in_read_session(topology_pid, &exec_command_session(&1, cmd, &2), opts) do
580+
{:ok, doc} ->
581+
{:ok, doc}
439582

440-
Error.should_retry_read(error, cmd, opts) ->
441-
issue_command(topology_pid, cmd, :read, Keyword.put(opts, :read_counter, 2))
583+
{:error, error} ->
584+
cond do
585+
Error.not_writable_primary_or_recovering?(error, opts) ->
586+
## in case of explicitly
587+
issue_command(topology_pid, cmd, :read, Keyword.put(opts, :retry_counter, 2))
442588

443-
true ->
444-
{:error, error}
445-
end
589+
Error.should_retry_read(error, cmd, opts) ->
590+
issue_command(topology_pid, cmd, :read, Keyword.put(opts, :read_counter, 2))
446591

447-
_other ->
448-
result
449-
end
450-
else
451-
_ -> {:error, Mongo.Error.exception("Command processing error")}
592+
true ->
593+
{:error, error}
594+
end
452595
end
453596
end
454597

455598
def issue_command(topology_pid, cmd, :write, opts) do
456599
## check, if retryable reads are enabled
457600
opts = Mongo.retryable_writes(opts, acknowledged?(cmd[:writeConcerns]))
458601

459-
with {:ok, session} <- Session.start_implicit_session(topology_pid, :write, opts),
460-
result <- exec_command_session(session, cmd, opts),
461-
:ok <- Session.end_implict_session(topology_pid, session) do
462-
case result do
463-
{:error, error} ->
464-
cond do
465-
Error.not_writable_primary_or_recovering?(error, opts) ->
466-
## in case of explicitly
467-
issue_command(topology_pid, cmd, :read, Keyword.put(opts, :retry_counter, 2))
602+
case in_write_session(topology_pid, &exec_command_session(&1, cmd, &2), opts) do
603+
{:ok, doc} ->
604+
{:ok, doc}
468605

469-
Error.should_retry_write(error, cmd, opts) ->
470-
issue_command(topology_pid, cmd, :write, Keyword.put(opts, :write_counter, 2))
606+
{:error, error} ->
607+
cond do
608+
Error.not_writable_primary_or_recovering?(error, opts) ->
609+
## in case of explicitly
610+
issue_command(topology_pid, cmd, :read, Keyword.put(opts, :retry_counter, 2))
471611

472-
true ->
473-
{:error, error}
474-
end
612+
Error.should_retry_write(error, cmd, opts) ->
613+
issue_command(topology_pid, cmd, :write, Keyword.put(opts, :write_counter, 2))
475614

476-
result ->
477-
result
478-
end
615+
true ->
616+
{:error, error}
617+
end
479618
end
480619
end
481620

@@ -1389,16 +1528,16 @@ defmodule Mongo do
13891528
Convenient function that drops the database `name`.
13901529
"""
13911530
@spec drop_database(GenServer.server(), String.t() | nil) :: :ok | {:error, Mongo.Error.t()}
1392-
def drop_database(topology_pid, name \\ nil)
1531+
def drop_database(topology_pid, name, opts \\ [])
13931532

1394-
def drop_database(topology_pid, nil) do
1395-
with {:ok, _} <- Mongo.issue_command(topology_pid, [dropDatabase: 1], :write, []) do
1533+
def drop_database(topology_pid, nil, opts) do
1534+
with {:ok, _} <- Mongo.issue_command(topology_pid, [dropDatabase: 1], :write, opts) do
13961535
:ok
13971536
end
13981537
end
13991538

1400-
def drop_database(topology_pid, name) do
1401-
with {:ok, _} <- Mongo.issue_command(topology_pid, [dropDatabase: 1], :write, database: name) do
1539+
def drop_database(topology_pid, name, opts) do
1540+
with {:ok, _} <- Mongo.issue_command(topology_pid, [dropDatabase: 1], :write, Keyword.put(opts, :database, name)) do
14021541
:ok
14031542
end
14041543
end
@@ -1442,7 +1581,7 @@ defmodule Mongo do
14421581
def retryable_reads(opts) do
14431582
case opts[:read_counter] do
14441583
nil ->
1445-
case opts[:retryable_reads] == true && opts[:session] == nil do
1584+
case opts[:retryable_reads] == true && get_session(opts) == nil do
14461585
true -> opts ++ [read_counter: 1]
14471586
false -> opts
14481587
end
@@ -1651,4 +1790,8 @@ defmodule Mongo do
16511790
defp command_color(:commitTransaction), do: :magenta
16521791
defp command_color(:configureFailPoint), do: :blue
16531792
defp command_color(_), do: nil
1793+
1794+
def get_session(opts) do
1795+
Process.get(:session) || opts[:session]
1796+
end
16541797
end

0 commit comments

Comments
 (0)