Skip to content

Commit 3428717

Browse files
committed
better error handling when using transaction, #47
1 parent 50513f5 commit 3428717

File tree

5 files changed

+153
-12
lines changed

5 files changed

+153
-12
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
* refactored the test cases
77
* now using mtools for a MongoDB deployment in the travis ci environment
88
* travis ci uses only the latest MongoDB version [The failCommand](https://github.com/mongodb/mongo/wiki/The-%22failCommand%22-fail-point)
9+
* `Session.commit_transaction` returns now `:ok` or an error `{:error, %Mongo.Error{}}`
910

1011
* Bugfixes
1112
* Using `max_staleness_ms` > 0 results in a crash

lib/mongo/error.ex

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,11 @@ defmodule Mongo.Error do
108108
def should_retry_write(_error, _cmd, _opts) do
109109
false
110110
end
111+
112+
def has_label(%Mongo.Error{error_labels: []}, _label), do: false
113+
def has_label(%Mongo.Error{error_labels: labels}, label) do
114+
Enum.any?(labels, fn l -> l == label end)
115+
end
111116
end
112117

113118
defmodule Mongo.WriteError do

lib/mongo/events.ex

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ defmodule Mongo.Events do
6767

6868
defmodule RetryReadEvent do
6969
@moduledoc false
70-
7170
defstruct [
7271
:command, ## Returns the command.
7372
:command_name, ## Returns the command name.

lib/mongo/session.ex

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,14 @@ defmodule Mongo.Session do
100100
import Mongo.WriteConcern
101101

102102
alias BSON.Timestamp
103+
alias Mongo.Error
103104
alias Mongo.ReadPreference
104105
alias Mongo.Session
105106
alias Mongo.Session.ServerSession
106107
alias Mongo.Topology
107108

109+
@retry_timeout_seconds 120
110+
108111
@type t :: pid()
109112

110113
##
@@ -291,19 +294,21 @@ defmodule Mongo.Session do
291294
with {:ok, session} <- Session.start_session(topology_pid, :write, opts),
292295
:ok <- Session.start_transaction(session) do
293296

294-
with {:ok, result} <- run_function(fun, Keyword.merge(opts, session: session)) do
295-
commit_transaction(session)
297+
with {:ok, result} <- run_function(fun, Keyword.merge(opts, session: session)),
298+
commit_result <- commit_transaction(session) do
299+
296300
end_session(topology_pid, session)
297-
{:ok, result}
301+
case commit_result do
302+
:ok -> {:ok, result}
303+
error -> error
304+
end
298305
else
299306
error ->
300307
abort_transaction(session)
301308
end_session(topology_pid, session)
302309
error
303310
end
304-
305311
end
306-
307312
end
308313

309314
##
@@ -469,7 +474,11 @@ defmodule Mongo.Session do
469474
{:next_state, :transaction_committed, :ok}
470475
end
471476
def handle_call_event(:commit_transaction, :transaction_in_progress, data) do
472-
{:next_state, :transaction_committed, run_commit_command(data)}
477+
with :ok <- run_commit_command(data) do
478+
{:next_state, :transaction_committed, :ok}
479+
else
480+
error -> {:keep_state_and_data, error}
481+
end
473482
end
474483
def handle_call_event(:abort_transaction, :starting_transaction, _data) do
475484
{:next_state, :transaction_aborted, :ok}
@@ -514,21 +523,42 @@ defmodule Mongo.Session do
514523
##
515524
# Run the commit transaction command.
516525
#
517-
defp run_commit_command(%Session{conn: conn, recovery_token: recovery_token, server_session: %ServerSession{session_id: id, txn_num: txn_num}, opts: opts}) do
526+
defp run_commit_command(session) do
527+
run_commit_command(session, DateTime.utc_now(), :first)
528+
end
529+
530+
defp run_commit_command(%Session{conn: conn,
531+
recovery_token: recovery_token,
532+
server_session: %ServerSession{session_id: id, txn_num: txn_num},
533+
opts: opts} = session, time, n) do
534+
535+
##
536+
# Drivers should apply a majority write concern when retrying commitTransaction to guard against a transaction being applied twice.
537+
write_concern = case n do
538+
:first -> write_concern(opts)
539+
_ -> Map.put(write_concern(opts) || %{}, :w, :majority)
540+
end
518541

519542
cmd = [
520543
commitTransaction: 1,
521544
lsid: %{id: id},
522545
txnNumber: %BSON.LongNumber{value: txn_num},
523546
autocommit: false,
524-
writeConcern: write_concern(opts),
547+
writeConcern: write_concern, ## todo: w:majority
525548
maxTimeMS: max_time_ms(opts),
526549
recoveryToken: recovery_token
527550
] |> filter_nils()
528551

529-
_doc = Mongo.exec_command(conn, cmd, database: "admin")
530-
531-
:ok
552+
with {:ok, _doc} <- Mongo.exec_command(conn, cmd, database: "admin") do
553+
:ok
554+
else
555+
{:error, error} ->
556+
try_again = Error.has_label(error, "UnknownTransactionCommitResult") && DateTime.diff(DateTime.utc_now(), time, :second) < @retry_timeout_seconds
557+
case try_again do
558+
true -> run_commit_command(session, time, :retry)
559+
false -> {:error, error}
560+
end
561+
end
532562
end
533563

534564
defp max_time_ms(opts) do
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
defmodule Mongo.TransactionRetriesTest do
2+
3+
use CollectionCase
4+
5+
alias Mongo.Session
6+
7+
test "transaction returns an error", %{pid: top} do
8+
9+
coll = unique_collection()
10+
11+
:ok = Mongo.create(top, coll)
12+
13+
{:ok, session} = Session.start_session(top, :write, [])
14+
assert :ok = Session.start_transaction(session)
15+
16+
assert {:ok, _} = Mongo.insert_one(top, coll, %{name: "Greta"}, session: session)
17+
18+
cmd = [
19+
configureFailPoint: "failCommand",
20+
mode: [times: 1],
21+
data: [errorCode: 3, failCommands: ["commitTransaction"]]
22+
]
23+
24+
{:ok, _doc} = Mongo.admin_command(top, cmd)
25+
26+
{:error, %Mongo.Error{}} = Session.commit_transaction(session)
27+
28+
assert :ok == Session.abort_transaction(session)
29+
assert :ok == Session.end_session(top, session)
30+
end
31+
32+
@tag :mongo_4_3
33+
test "transaction retry", %{pid: top, catcher: catcher} do
34+
35+
coll = unique_collection()
36+
37+
:ok = Mongo.create(top, coll)
38+
39+
{:ok, session} = Session.start_session(top, :write, [])
40+
assert :ok = Session.start_transaction(session)
41+
42+
assert {:ok, _} = Mongo.insert_one(top, coll, %{name: "Greta"}, session: session)
43+
44+
cmd = [
45+
configureFailPoint: "failCommand",
46+
mode: [times: 3],
47+
data: [errorCode: 6, failCommands: ["commitTransaction"], errorLabels: ["UnknownTransactionCommitResult"]]
48+
]
49+
50+
{:ok, _doc} = Mongo.admin_command(top, cmd)
51+
assert :ok == Session.commit_transaction(session)
52+
53+
assert :ok == Session.end_session(top, session)
54+
55+
assert [:commitTransaction, :commitTransaction, :commitTransaction] = EventCatcher.failed_events(catcher) |> Enum.map(fn event -> event.command_name end)
56+
assert [:commitTransaction, :configureFailPoint, :insert, :create] = EventCatcher.succeeded_events(catcher) |> Enum.map(fn event -> event.command_name end)
57+
58+
end
59+
60+
test "with_transaction, return an error", %{pid: top} do
61+
62+
coll = unique_collection()
63+
64+
:ok = Mongo.create(top, coll)
65+
66+
assert {:error, %Mongo.Error{}} = Session.with_transaction(top, fn opts ->
67+
{:ok, _} = Mongo.insert_one(top, coll, %{name: "Greta"}, opts)
68+
{:ok, _} = Mongo.insert_one(top, coll, %{name: "Waldo"}, opts)
69+
{:ok, _} = Mongo.insert_one(top, coll, %{name: "Tom"}, opts)
70+
71+
cmd = [
72+
configureFailPoint: "failCommand",
73+
mode: [times: 1],
74+
data: [errorCode: 3, failCommands: ["commitTransaction"]]
75+
]
76+
77+
{:ok, _doc} = Mongo.admin_command(top, cmd)
78+
79+
{:ok, []}
80+
end)
81+
end
82+
83+
test "with_transaction, retry commit", %{pid: top} do
84+
85+
coll = unique_collection()
86+
87+
:ok = Mongo.create(top, coll)
88+
89+
assert {:ok, []} = Session.with_transaction(top, fn opts ->
90+
{:ok, _} = Mongo.insert_one(top, coll, %{name: "Greta"}, opts)
91+
{:ok, _} = Mongo.insert_one(top, coll, %{name: "Waldo"}, opts)
92+
{:ok, _} = Mongo.insert_one(top, coll, %{name: "Tom"}, opts)
93+
94+
cmd = [
95+
configureFailPoint: "failCommand",
96+
mode: [times: 3],
97+
data: [errorCode: 6, failCommands: ["commitTransaction"], errorLabels: ["UnknownTransactionCommitResult"]]
98+
]
99+
100+
{:ok, _doc} = Mongo.admin_command(top, cmd)
101+
102+
{:ok, []}
103+
end)
104+
end
105+
106+
end

0 commit comments

Comments
 (0)