@@ -193,9 +193,12 @@ defmodule Mongo.Session do
193
193
@ doc """
194
194
Commit the current transation.
195
195
"""
196
- @ spec commit_transaction ( Session . t ) :: :ok | { :error , term ( ) }
196
+ @ spec commit_transaction ( Session . t , DateTime . t ) :: :ok | { :error , term ( ) }
197
197
def commit_transaction ( pid ) do
198
- call ( pid , :commit_transaction )
198
+ call ( pid , { :commit_transaction , DateTime . utc_now ( ) } )
199
+ end
200
+ def commit_transaction ( pid , start_time ) do
201
+ call ( pid , { :commit_transaction , start_time } )
199
202
end
200
203
201
204
@ doc """
@@ -277,37 +280,65 @@ defmodule Mongo.Session do
277
280
@ doc """
278
281
Convenient function for running multiple write commands in a transaction.
279
282
283
+ In case of `TransientTransactionError` or `UnknownTransactionCommitResult` the function will retry the whole transaction or
284
+ the commit of the transaction. You can specify a timeout (`:transaction_retry_timeout_s`) to limit the time of repeating.
285
+ The default value is 120 seconds. If you don't wait so long, you call `with_transaction` with the
286
+ option `transaction_retry_timeout_s: 10`. In this case after 10 seconds of retrying, the function will return
287
+ an error.
288
+
280
289
## Example
290
+
281
291
alias Mongo.Session
282
292
283
293
{:ok, ids} = Session.with_transaction(top, fn opts ->
284
294
{:ok, %InsertOneResult{:inserted_id => id1}} = Mongo.insert_one(top, "dogs", %{name: "Greta"}, opts)
285
295
{:ok, %InsertOneResult{:inserted_id => id2}} = Mongo.insert_one(top, "dogs", %{name: "Waldo"}, opts)
286
296
{:ok, %InsertOneResult{:inserted_id => id3}} = Mongo.insert_one(top, "dogs", %{name: "Tom"}, opts)
287
297
{:ok, [id1, id2, id3]}
288
- end, w: 1)
298
+ end, transaction_retry_timeout_s: 10)
299
+
300
+ From the specs:
301
+
302
+ The callback function may be executed multiple times
303
+
304
+ The implementation of `with_transaction` is based on the original examples for Retry Transactions and
305
+ Commit Operation from the MongoDB Manual. As such, the callback may be executed any number of times.
306
+ Drivers are free to encourage their users to design idempotent callbacks.
289
307
290
308
"""
291
309
@ spec with_transaction ( Session . t , ( keyword ( ) -> { :ok , any ( ) } | :error ) ) :: { :ok , any ( ) } | :error | { :error , term }
292
310
def with_transaction ( topology_pid , fun , opts \\ [ ] ) do
293
-
294
311
with { :ok , session } <- Session . start_session ( topology_pid , :write , opts ) ,
295
- :ok <- Session . start_transaction ( session ) do
296
-
297
- with { :ok , result } <- run_function ( fun , Keyword . merge ( opts , session: session ) ) ,
298
- commit_result <- commit_transaction ( session ) do
312
+ result <- run_in_transaction ( topology_pid , session , fun , DateTime . utc_now ( ) , opts ) ,
313
+ :ok <- end_session ( topology_pid , session ) do
314
+ result
315
+ end
316
+ end
317
+ def run_in_transaction ( topology_pid , session , fun , start_time , opts ) do
318
+ with :ok <- Session . start_transaction ( session ) ,
319
+ { :ok , result } <- run_function ( fun , Keyword . merge ( opts , session: session ) ) ,
320
+ commit_result <- commit_transaction ( session , start_time ) do
299
321
300
- end_session ( topology_pid , session )
301
- case commit_result do
302
- :ok -> { :ok , result }
303
- error -> error
304
- end
305
- else
322
+ ## check the result
323
+ case commit_result do
324
+ :ok -> { :ok , result } ## everything is okay
306
325
error ->
307
- abort_transaction ( session )
308
- end_session ( topology_pid , session )
326
+ abort_transaction ( session ) ## the rest is an error
309
327
error
310
328
end
329
+ else
330
+
331
+ { :error , error } ->
332
+ abort_transaction ( session ) ## check in case of an error while processing transaction
333
+ timeout = opts [ :transaction_retry_timeout_s ] || @ retry_timeout_seconds
334
+ case Error . has_label ( error , "TransientTransactionError" ) && DateTime . diff ( DateTime . utc_now ( ) , start_time , :second ) < timeout do
335
+ true -> run_in_transaction ( topology_pid , session , fun , start_time , opts )
336
+ false -> { :error , error }
337
+ end
338
+
339
+ other ->
340
+ abort_transaction ( session ) ## everything else is an error
341
+ { :error , other }
311
342
end
312
343
end
313
344
@@ -316,7 +347,6 @@ defmodule Mongo.Session do
316
347
#
317
348
defp run_function ( fun , opts ) do
318
349
319
- ## todo wait max 120s
320
350
try do
321
351
fun . ( opts )
322
352
rescue
@@ -470,22 +500,28 @@ defmodule Mongo.Session do
470
500
def handle_call_event ( { :bind_session , cmd } , _transaction , % Session { conn: conn } ) do
471
501
{ :keep_state_and_data , { :ok , conn , cmd } }
472
502
end
473
- def handle_call_event ( :commit_transaction , :starting_transaction , _data ) do
503
+ def handle_call_event ( { :commit_transaction , _start_time } , :starting_transaction , _data ) do
474
504
{ :next_state , :transaction_committed , :ok }
475
505
end
476
- def handle_call_event ( :commit_transaction , :transaction_in_progress , data ) do
477
- with :ok <- run_commit_command ( data ) do
506
+ def handle_call_event ( { :commit_transaction , start_time } , :transaction_in_progress , data ) do
507
+ with :ok <- run_commit_command ( data , start_time ) do
478
508
{ :next_state , :transaction_committed , :ok }
479
509
else
480
510
error -> { :keep_state_and_data , error }
481
511
end
482
512
end
513
+ def handle_call_event ( { :commit_transaction , _start_time } , _state , _data ) do ## in other cases we will ignore the commit command
514
+ { :keep_state_and_data , :ok }
515
+ end
483
516
def handle_call_event ( :abort_transaction , :starting_transaction , _data ) do
484
517
{ :next_state , :transaction_aborted , :ok }
485
518
end
486
519
def handle_call_event ( :abort_transaction , :transaction_in_progress , data ) do
487
520
{ :next_state , :transaction_aborted , run_abort_command ( data ) }
488
521
end
522
+ def handle_call_event ( :abort_transaction , _state , _data ) do
523
+ { :keep_state_and_data , :ok }
524
+ end
489
525
def handle_call_event ( :connection , _state , % { conn: conn } ) do
490
526
{ :keep_state_and_data , conn }
491
527
end
@@ -523,8 +559,8 @@ defmodule Mongo.Session do
523
559
##
524
560
# Run the commit transaction command.
525
561
#
526
- defp run_commit_command ( session ) do
527
- run_commit_command ( session , DateTime . utc_now ( ) , :first )
562
+ defp run_commit_command ( session , start_time ) do
563
+ run_commit_command ( session , start_time , :first )
528
564
end
529
565
530
566
defp run_commit_command ( % Session { conn: conn ,
@@ -544,7 +580,7 @@ defmodule Mongo.Session do
544
580
lsid: % { id: id } ,
545
581
txnNumber: % BSON.LongNumber { value: txn_num } ,
546
582
autocommit: false ,
547
- writeConcern: write_concern , ## todo: w:majority
583
+ writeConcern: write_concern ,
548
584
maxTimeMS: max_time_ms ( opts ) ,
549
585
recoveryToken: recovery_token
550
586
] |> filter_nils ( )
@@ -553,7 +589,8 @@ defmodule Mongo.Session do
553
589
:ok
554
590
else
555
591
{ :error , error } ->
556
- try_again = Error . has_label ( error , "UnknownTransactionCommitResult" ) && DateTime . diff ( DateTime . utc_now ( ) , time , :second ) < @ retry_timeout_seconds
592
+ timeout = opts [ :transaction_retry_timeout_s ] || @ retry_timeout_seconds
593
+ try_again = Error . has_label ( error , "UnknownTransactionCommitResult" ) && DateTime . diff ( DateTime . utc_now ( ) , time , :second ) < timeout
557
594
case try_again do
558
595
true -> run_commit_command ( session , time , :retry )
559
596
false -> { :error , error }
0 commit comments