Skip to content

Commit b8ba0d2

Browse files
committed
better support for errors while using bulk write, added some new tests for transaction, added missing write concern options
1 parent eb0dff2 commit b8ba0d2

File tree

5 files changed

+284
-28
lines changed

5 files changed

+284
-28
lines changed

lib/mongo.ex

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -279,20 +279,22 @@ defmodule Mongo do
279279
@spec find_one_and_update(GenServer.server, collection, BSON.document, BSON.document, Keyword.t) :: result(BSON.document) | {:ok, nil}
280280
def find_one_and_update(topology_pid, coll, filter, update, opts \\ []) do
281281
_ = modifier_docs(update, :update)
282+
282283
cmd = [
283284
findAndModify: coll,
284285
query: filter,
286+
sort: opts[:sort],
285287
update: update,
286-
bypassDocumentValidation: opts[:bypass_document_validation],
287-
maxTimeMS: opts[:max_time],
288-
fields: opts[:projection],
289288
new: should_return_new(opts[:return_document]),
290-
sort: opts[:sort],
289+
fields: opts[:projection],
291290
upsert: opts[:upsert],
291+
bypassDocumentValidation: opts[:bypass_document_validation],
292+
writeConcern: write_concern(opts),
293+
maxTimeMS: opts[:max_time],
292294
collation: opts[:collation]
293295
] |> filter_nils()
294296

295-
opts = Keyword.drop(opts, ~w(bypass_document_validation max_time projection return_document sort upsert collation)a)
297+
opts = Keyword.drop(opts, ~w(bypass_document_validation max_time projection return_document sort upsert collation w j wtimeout)a)
296298

297299
with {:ok, doc} <- issue_command(topology_pid, cmd, :write, opts) do
298300
{:ok, doc["value"]}
@@ -342,6 +344,9 @@ defmodule Mongo do
342344
@spec find_one_and_replace(GenServer.server, collection, BSON.document, BSON.document, Keyword.t) :: result(BSON.document)
343345
def find_one_and_replace(topology_pid, coll, filter, replacement, opts \\ []) do
344346
_ = modifier_docs(replacement, :replace)
347+
348+
write_concern = write_concern(opts)
349+
345350
cmd = [
346351
findAndModify: coll,
347352
query: filter,
@@ -352,7 +357,8 @@ defmodule Mongo do
352357
new: should_return_new(opts[:return_document]),
353358
sort: opts[:sort],
354359
upsert: opts[:upsert],
355-
collation: opts[:collation]
360+
collation: opts[:collation],
361+
writeConcern: write_concern
356362
] |> filter_nils()
357363

358364
opts = Keyword.drop(opts, ~w(bypass_document_validation max_time projection return_document sort upsert collation)a)
@@ -376,14 +382,18 @@ defmodule Mongo do
376382
"""
377383
@spec find_one_and_delete(GenServer.server, collection, BSON.document, Keyword.t) :: result(BSON.document)
378384
def find_one_and_delete(topology_pid, coll, filter, opts \\ []) do
385+
386+
write_concern = write_concern(opts)
387+
379388
cmd = [
380389
findAndModify: coll,
381390
query: filter,
382391
remove: true,
383392
maxTimeMS: opts[:max_time],
384393
fields: opts[:projection],
385394
sort: opts[:sort],
386-
collation: opts[:collation]
395+
collation: opts[:collation],
396+
writeConcern: write_concern
387397
] |> filter_nils()
388398
opts = Keyword.drop(opts, ~w(max_time projection sort collation)a)
389399

@@ -437,8 +447,8 @@ defmodule Mongo do
437447

438448
case documents do
439449
[%{"n" => count}] -> {:ok, count}
440-
[] -> {:error, Mongo.Error.exception(message: "nothing returned")}
441-
_ -> :ok # fixes {:error, :too_many_documents_returned}
450+
[] -> {:ok, 0}
451+
_ -> :error
442452
end
443453
end
444454

lib/mongo/bulk_write.ex

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ defmodule Mongo.BulkWrite do
298298
|> Enum.map(fn
299299
{:ok, %{"n" => n} = doc} -> BulkWriteResult.insert_result(n, ids, doc["writeErrors"] || [])
300300
{:ok, _other} -> BulkWriteResult.empty()
301+
{:error, reason} -> BulkWriteResult.error(reason)
301302
end)
302303
|> BulkWriteResult.reduce()
303304
end
@@ -310,6 +311,7 @@ defmodule Mongo.BulkWrite do
310311
BulkWriteResult.update_result(n - l, modified, l, filter_upsert_ids(ids), doc["writeErrors"] || [])
311312
{:ok, %{"n" => matched, "nModified" => modified} = doc} -> BulkWriteResult.update_result(matched, modified, 0, [], doc["writeErrors"] || [])
312313
{:ok, _other} -> BulkWriteResult.empty()
314+
{:error, reason} -> BulkWriteResult.error(reason)
313315
end)
314316
|> BulkWriteResult.reduce()
315317

@@ -320,6 +322,7 @@ defmodule Mongo.BulkWrite do
320322
|> Enum.map(fn
321323
{:ok, %{"n" => n} = doc } -> BulkWriteResult.delete_result(n, doc["writeErrors"] || [])
322324
{:ok, _other} -> BulkWriteResult.empty()
325+
{:error, reason} -> BulkWriteResult.error(reason)
323326
end)
324327
|> BulkWriteResult.reduce()
325328

lib/mongo/results.ex

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,10 @@ defmodule Mongo.BulkWriteResult do
108108
%BulkWriteResult{deleted_count: count, errors: errors}
109109
end
110110

111+
def error(error) do
112+
%BulkWriteResult{errors: [error]}
113+
end
114+
111115
def empty() do
112116
%BulkWriteResult{}
113117
end

lib/mongo/session.ex

Lines changed: 81 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@ defmodule Mongo.Session do
22

33
@moduledoc """
44
5-
see https://github.com/mongodb/specifications/blob/master/source/transactions/transactions.rst#committransaction
6-
7-
8-
see https://andrealeopardi.com/posts/connection-managers-with-gen_statem/
5+
For gen_statem look here
6+
* see https://github.com/mongodb/specifications/blob/master/source/transactions/transactions.rst#committransaction
7+
* see https://andrealeopardi.com/posts/connection-managers-with-gen_statem/
98
"""
109

1110
@behaviour :gen_statem
1211

1312
import Keywords
13+
import Mongo.WriteConcern
1414

1515
alias Mongo.Session.ServerSession
1616
alias Mongo.Session
@@ -114,6 +114,39 @@ defmodule Mongo.Session do
114114
end
115115
end
116116

117+
def with_transaction(topology_pid, fun, opts \\ []) do
118+
119+
with {:ok, session} <- Session.start_session(topology_pid, :write, opts),
120+
:ok <- Session.start_transaction(session) do
121+
122+
with {:ok, result} <- run_function(fun, Keyword.merge(opts, session: session)) do
123+
commit_transaction(session)
124+
end_session(topology_pid, session)
125+
{:ok, result}
126+
else
127+
error ->
128+
abort_transaction(session)
129+
end_session(topology_pid, session)
130+
## todo rerun
131+
error
132+
end
133+
134+
end
135+
136+
end
137+
138+
139+
defp run_function(fun, opts) do
140+
141+
## warte max 120ms, ansonsten kill
142+
try do
143+
rescue
144+
reason -> {:error, reason}
145+
end
146+
fun.(opts)
147+
148+
end
149+
117150
def connection(pid) do
118151
:gen_statem.call(pid, {:connection})
119152
end
@@ -136,43 +169,71 @@ defmodule Mongo.Session do
136169
end
137170

138171
@impl true
139-
def handle_event({:call, from}, {:start_transaction}, state, %Session{server_session: session} = data) when state in [:no_transaction, :transaction_aborted, :transaction_committed] do
172+
def handle_event({:call, from},
173+
{:start_transaction},
174+
transaction,
175+
%Session{server_session: session} = data) when transaction in [:no_transaction, :transaction_aborted, :transaction_committed] do
140176
{:next_state, :starting_transaction, %Session{data | server_session: ServerSession.next_txn_num(session)}, {:reply, from, :ok}}
141177
end
178+
142179
##
143-
# bind session: only if wire_version >= 6, MongoDB 3.6.x
180+
# bind session: only if wire_version >= 6, MongoDB 3.6.x and no transaction is running: only lsid is added
144181
#
145-
def handle_event({:call, from}, {:bind_session, cmd}, :no_transaction, %Session{conn: conn, wire_version: wire_version, server_session: %ServerSession{session_id: id}}) when wire_version >= 6 do
182+
def handle_event({:call, from},
183+
{:bind_session, cmd},
184+
transaction,
185+
%Session{conn: conn,
186+
wire_version: wire_version,
187+
server_session: %ServerSession{session_id: id}}) when wire_version >= 6 and transaction in [:no_transaction, :transaction_aborted, :transaction_committed] do
146188
{:keep_state_and_data, {:reply, from, {:ok, conn, Keyword.merge(cmd, lsid: %{id: id})}}}
147189
end
148-
def handle_event({:call, from}, {:bind_session, cmd}, :starting_transaction,
149-
%Session{conn: conn,
150-
server_session: %ServerSession{session_id: id, txn_num: txn_num},
151-
wire_version: wire_version, opts: opts} = data) when wire_version >= 6 do
190+
191+
def handle_event({:call, from},
192+
{:bind_session, cmd},
193+
:starting_transaction,
194+
%Session{conn: conn,
195+
server_session: %ServerSession{session_id: id, txn_num: txn_num},
196+
wire_version: wire_version,
197+
opts: opts} = data) when wire_version >= 6 do
152198
result = Keyword.merge(cmd,
153199
readConcern: Keyword.get(opts, :read_concern),
154200
lsid: %{id: id},
155201
txnNumber: %BSON.LongNumber{value: txn_num},
156202
startTransaction: true,
157-
autocommit: false) |> filter_nils()
203+
autocommit: false) |> filter_nils() |> Keyword.drop(~w(writeConcern)a)
204+
158205
{:next_state, :transaction_in_progress, data, {:reply, from, {:ok, conn, result}}}
159206
end
160-
def handle_event({:call, from}, {:bind_session, cmd}, :transaction_in_progress,
161-
%Session{conn: conn, wire_version: wire_version,
162-
server_session: %ServerSession{session_id: id, txn_num: txn_num}}) when wire_version >= 6 do
207+
208+
def handle_event({:call, from},
209+
{:bind_session, cmd},
210+
:transaction_in_progress,
211+
%Session{conn: conn, wire_version: wire_version,
212+
server_session: %ServerSession{session_id: id, txn_num: txn_num}}) when wire_version >= 6 do
163213
result = Keyword.merge(cmd,
164214
lsid: %{id: id},
165215
txnNumber: %BSON.LongNumber{value: txn_num},
166-
autocommit: false)
216+
autocommit: false) |> Keyword.drop(~w(writeConcern readConcern)a)
167217
{:keep_state_and_data, {:reply, from, {:ok, conn, result}}}
168218
end
169-
def handle_event({:call, from}, {:bind_session, cmd}, transaction, %Session{conn: conn}) when transaction in [:no_transaction, :starting_transaction, :transaction_in_progress] do
219+
220+
# In case of wire_version < 6 we do nothing
221+
def handle_event({:call, from},
222+
{:bind_session, cmd},
223+
_transaction,
224+
%Session{conn: conn}) do
170225
{:keep_state_and_data, {:reply, from, {:ok, conn, cmd}}}
171226
end
172227

228+
def handle_event({:call, from}, {:commit_transaction}, :starting_transaction, data) do
229+
{:next_state, :transaction_committed, data, {:reply, from, :ok}}
230+
end
173231
def handle_event({:call, from}, {:commit_transaction}, :transaction_in_progress, data) do
174232
{:next_state, :transaction_committed, data, {:reply, from, run_commit_command(data)}}
175233
end
234+
def handle_event({:call, from}, {:abort_transaction}, :starting_transaction, data) do
235+
{:next_state, :transaction_aborted, data, {:reply, from, :ok}}
236+
end
176237
def handle_event({:call, from}, {:abort_transaction}, :transaction_in_progress, data) do
177238
{:next_state, :transaction_aborted, data, {:reply, from, run_abort_command(data)}}
178239
end
@@ -215,7 +276,7 @@ defmodule Mongo.Session do
215276
lsid: %{id: id},
216277
txnNumber: %BSON.LongNumber{value: txn_num},
217278
autocommit: false,
218-
writeConcern: Keyword.get(opts, :write_concern),
279+
writeConcern: write_concern(opts),
219280
maxTimeMS: Keyword.get(opts, :max_commit_time_ms)
220281
] |> filter_nils()
221282

@@ -225,12 +286,13 @@ defmodule Mongo.Session do
225286
defp run_abort_command(%{conn: conn, server_session: %ServerSession{session_id: id, txn_num: txn_num}, opts: opts}) do
226287

227288
Logger.debug("Running abort transaction")
289+
228290
cmd = [
229291
abortTransaction: 1,
230292
lsid: %{id: id},
231293
txnNumber: %BSON.LongNumber{value: txn_num},
232294
autocommit: false,
233-
writeConcern: Keyword.get(opts, :write_concern),
295+
writeConcern: write_concern(opts)
234296
] |> filter_nils()
235297

236298
Mongo.exec_command(conn, cmd, database: "admin")

0 commit comments

Comments
 (0)