100
100
101
101
from bson .binary import Binary
102
102
from bson .int64 import Int64
103
- from bson .py3compat import abc , reraise_instance
103
+ from bson .py3compat import abc , integer_types , reraise_instance
104
104
from bson .son import SON
105
105
from bson .timestamp import Timestamp
106
106
@@ -158,18 +158,35 @@ class TransactionOptions(object):
158
158
"""Options for :meth:`ClientSession.start_transaction`.
159
159
160
160
:Parameters:
161
- - `read_concern`: The :class:`~pymongo.read_concern.ReadConcern` to use
162
- for this transaction.
163
- - `write_concern`: The :class:`~pymongo.write_concern.WriteConcern` to
164
- use for this transaction.
161
+ - `read_concern` (optional): The
162
+ :class:`~pymongo.read_concern.ReadConcern` to use for this transaction.
163
+ If ``None`` (the default) the :attr:`read_preference` of
164
+ the :class:`MongoClient` is used.
165
+ - `write_concern` (optional): The
166
+ :class:`~pymongo.write_concern.WriteConcern` to use for this
167
+ transaction. If ``None`` (the default) the :attr:`read_preference` of
168
+ the :class:`MongoClient` is used.
169
+ - `read_preference` (optional): The read preference to use. If
170
+ ``None`` (the default) the :attr:`read_preference` of this
171
+ :class:`MongoClient` is used. See :mod:`~pymongo.read_preferences`
172
+ for options. Transactions which read must use
173
+ :attr:`~pymongo.read_preferences.ReadPreference.PRIMARY`.
174
+ - `max_commit_time_ms` (optional): The maximum amount of time to allow a
175
+ single commitTransaction command to run. This option is an alias for
176
+ maxTimeMS option on the commitTransaction command. If ``None`` (the
177
+ default) maxTimeMS is not used.
178
+
179
+ .. versionchanged:: 3.9
180
+ Added the ``max_commit_time_ms`` option.
165
181
166
182
.. versionadded:: 3.7
167
183
"""
168
184
def __init__ (self , read_concern = None , write_concern = None ,
169
- read_preference = None ):
185
+ read_preference = None , max_commit_time_ms = None ):
170
186
self ._read_concern = read_concern
171
187
self ._write_concern = write_concern
172
188
self ._read_preference = read_preference
189
+ self ._max_commit_time_ms = max_commit_time_ms
173
190
if read_concern is not None :
174
191
if not isinstance (read_concern , ReadConcern ):
175
192
raise TypeError ("read_concern must be an instance of "
@@ -189,6 +206,10 @@ def __init__(self, read_concern=None, write_concern=None,
189
206
raise TypeError ("%r is not valid for read_preference. See "
190
207
"pymongo.read_preferences for valid "
191
208
"options." % (read_preference ,))
209
+ if max_commit_time_ms is not None :
210
+ if not isinstance (max_commit_time_ms , integer_types ):
211
+ raise TypeError (
212
+ "max_commit_time_ms must be an integer or None" )
192
213
193
214
@property
194
215
def read_concern (self ):
@@ -206,6 +227,14 @@ def read_preference(self):
206
227
"""
207
228
return self ._read_preference
208
229
230
+ @property
231
+ def max_commit_time_ms (self ):
232
+ """The maxTimeMS to use when running a commitTransaction command.
233
+
234
+ .. versionadded:: 3.9
235
+ """
236
+ return self ._max_commit_time_ms
237
+
209
238
210
239
def _validate_session_write_concern (session , write_concern ):
211
240
"""Validate that an explicit session is not used with an unack'ed write.
@@ -279,10 +308,16 @@ def _reraise_with_unknown_commit(exc):
279
308
reraise_instance (exc , trace = sys .exc_info ()[2 ])
280
309
281
310
311
+ def _max_time_expired_error (exc ):
312
+ """Return true if exc is a MaxTimeMSExpired error."""
313
+ return isinstance (exc , OperationFailure ) and exc .code == 50
314
+
315
+
282
316
# From the transactions spec, all the retryable writes errors plus
283
317
# WriteConcernFailed.
284
318
_UNKNOWN_COMMIT_ERROR_CODES = _RETRYABLE_ERROR_CODES | frozenset ([
285
319
64 , # WriteConcernFailed
320
+ 50 , # MaxTimeMSExpired
286
321
])
287
322
288
323
# From the Convenient API for Transactions spec, with_transaction must
@@ -380,7 +415,7 @@ def _inherit_option(self, name, val):
380
415
return getattr (self .client , name )
381
416
382
417
def with_transaction (self , callback , read_concern = None , write_concern = None ,
383
- read_preference = None ):
418
+ read_preference = None , max_commit_time_ms = None ):
384
419
"""Execute a callback in a transaction.
385
420
386
421
This method starts a transaction on this session, executes ``callback``
@@ -465,7 +500,8 @@ def callback(session, custom_arg, custom_kwarg=None):
465
500
start_time = monotonic .time ()
466
501
while True :
467
502
self .start_transaction (
468
- read_concern , write_concern , read_preference )
503
+ read_concern , write_concern , read_preference ,
504
+ max_commit_time_ms )
469
505
try :
470
506
ret = callback (self )
471
507
except Exception as exc :
@@ -488,7 +524,8 @@ def callback(session, custom_arg, custom_kwarg=None):
488
524
self .commit_transaction ()
489
525
except PyMongoError as exc :
490
526
if (exc .has_error_label ("UnknownTransactionCommitResult" )
491
- and _within_time_limit (start_time )):
527
+ and _within_time_limit (start_time )
528
+ and not _max_time_expired_error (exc )):
492
529
# Retry the commit.
493
530
continue
494
531
@@ -502,11 +539,14 @@ def callback(session, custom_arg, custom_kwarg=None):
502
539
return ret
503
540
504
541
def start_transaction (self , read_concern = None , write_concern = None ,
505
- read_preference = None ):
542
+ read_preference = None , max_commit_time_ms = None ):
506
543
"""Start a multi-statement transaction.
507
544
508
545
Takes the same arguments as :class:`TransactionOptions`.
509
546
547
+ .. versionchanged:: 3.9
548
+ Added the ``max_commit_time_ms`` option.
549
+
510
550
.. versionadded:: 3.7
511
551
"""
512
552
self ._check_ended ()
@@ -518,9 +558,13 @@ def start_transaction(self, read_concern=None, write_concern=None,
518
558
write_concern = self ._inherit_option ("write_concern" , write_concern )
519
559
read_preference = self ._inherit_option (
520
560
"read_preference" , read_preference )
561
+ if max_commit_time_ms is None :
562
+ opts = self .options .default_transaction_options
563
+ if opts :
564
+ max_commit_time_ms = opts .max_commit_time_ms
521
565
522
566
self ._transaction .opts = TransactionOptions (
523
- read_concern , write_concern , read_preference )
567
+ read_concern , write_concern , read_preference , max_commit_time_ms )
524
568
self ._transaction .reset ()
525
569
self ._transaction .state = _TxnState .STARTING
526
570
self ._start_retryable_write ()
@@ -631,18 +675,25 @@ def _finish_transaction_with_retry(self, command_name, explict_retry):
631
675
raise exc
632
676
633
677
def _finish_transaction (self , command_name , retrying ):
634
- # Transaction spec says that after the initial commit attempt,
635
- # subsequent commitTransaction commands should be upgraded to use
636
- # w:"majority" and set a default value of 10 seconds for wtimeout.
637
- wc = self ._transaction .opts .write_concern
638
- if retrying and command_name == "commitTransaction" :
639
- wc_doc = wc .document
640
- wc_doc ["w" ] = "majority"
641
- wc_doc .setdefault ("wtimeout" , 10000 )
642
- wc = WriteConcern (** wc_doc )
678
+ opts = self ._transaction .opts
679
+ wc = opts .write_concern
643
680
cmd = SON ([(command_name , 1 )])
681
+ if command_name == "commitTransaction" :
682
+ if opts .max_commit_time_ms :
683
+ cmd ['maxTimeMS' ] = opts .max_commit_time_ms
684
+
685
+ # Transaction spec says that after the initial commit attempt,
686
+ # subsequent commitTransaction commands should be upgraded to use
687
+ # w:"majority" and set a default value of 10 seconds for wtimeout.
688
+ if retrying :
689
+ wc_doc = wc .document
690
+ wc_doc ["w" ] = "majority"
691
+ wc_doc .setdefault ("wtimeout" , 10000 )
692
+ wc = WriteConcern (** wc_doc )
693
+
644
694
if self ._transaction .recovery_token :
645
695
cmd ['recoveryToken' ] = self ._transaction .recovery_token
696
+
646
697
with self ._client ._socket_for_writes (self ) as sock_info :
647
698
return self ._client .admin ._command (
648
699
sock_info ,
0 commit comments