@@ -112,6 +112,9 @@ def __init__(
112
112
self .uses_hint_update = False
113
113
self .uses_hint_delete = False
114
114
self .uses_sort = False
115
+ self .is_retryable = True
116
+ self .retrying = False
117
+ self .started_retryable_write = False
115
118
# Extra state so that we know where to pick up on a retry attempt.
116
119
self .current_run = None
117
120
self .next_run = None
@@ -127,23 +130,23 @@ def bulk_ctx_class(self) -> Type[_BulkWriteContext]:
127
130
self .is_encrypted = False
128
131
return _BulkWriteContext
129
132
130
- @property
131
- def is_retryable (self ) -> bool :
132
- if self .current_run :
133
- return self .current_run .is_retryable
134
- return True
135
-
136
- @property
137
- def retrying (self ) -> bool :
138
- if self .current_run :
139
- return self .current_run .retrying
140
- return False
141
-
142
- @property
143
- def started_retryable_write (self ) -> bool :
144
- if self .current_run :
145
- return self .current_run .started_retryable_write
146
- return False
133
+ # @property
134
+ # def is_retryable(self) -> bool:
135
+ # if self.current_run:
136
+ # return self.current_run.is_retryable
137
+ # return True
138
+ #
139
+ # @property
140
+ # def retrying(self) -> bool:
141
+ # if self.current_run:
142
+ # return self.current_run.retrying
143
+ # return False
144
+ #
145
+ # @property
146
+ # def started_retryable_write(self) -> bool:
147
+ # if self.current_run:
148
+ # return self.current_run.started_retryable_write
149
+ # return False
147
150
148
151
def add_insert (self , document : _DocumentOut ) -> bool :
149
152
"""Add an insert document to the list of ops."""
@@ -255,7 +258,7 @@ def gen_ordered(
255
258
yield run
256
259
run = _Run (op_type )
257
260
run .add (idx , operation )
258
- run .is_retryable = run .is_retryable and retryable
261
+ self .is_retryable = self .is_retryable and retryable
259
262
if run is None :
260
263
raise InvalidOperation ("No operations to execute" )
261
264
yield run
@@ -273,7 +276,7 @@ def gen_unordered(
273
276
retryable = process (request )
274
277
(op_type , operation ) = self .ops [idx ]
275
278
operations [op_type ].add (idx , operation )
276
- operations [ op_type ] .is_retryable = operations [ op_type ] .is_retryable and retryable
279
+ self .is_retryable = self .is_retryable and retryable
277
280
if (
278
281
len (operations [_INSERT ].ops ) == 0
279
282
and len (operations [_UPDATE ].ops ) == 0
@@ -533,7 +536,7 @@ async def _execute_command(
533
536
last_run = False
534
537
535
538
while run :
536
- if not run .retrying :
539
+ if not self .retrying :
537
540
self .next_run = next (generator , None )
538
541
if self .next_run is None :
539
542
last_run = True
@@ -567,10 +570,10 @@ async def _execute_command(
567
570
if session :
568
571
# Start a new retryable write unless one was already
569
572
# started for this command.
570
- if run .is_retryable and not run .started_retryable_write :
573
+ if self .is_retryable and not self .started_retryable_write :
571
574
session ._start_retryable_write ()
572
575
self .started_retryable_write = True
573
- session ._apply_to (cmd , run .is_retryable , ReadPreference .PRIMARY , conn )
576
+ session ._apply_to (cmd , self .is_retryable , ReadPreference .PRIMARY , conn )
574
577
conn .send_cluster_time (cmd , session , client )
575
578
conn .add_server_api (cmd )
576
579
# CSOT: apply timeout before encoding the command.
0 commit comments