Skip to content

Commit 00c862e

Browse files
committed
Improved exception handling
1 parent 0b4716b commit 00c862e

File tree

1 file changed

+11
-29
lines changed

1 file changed

+11
-29
lines changed

redis/cluster.py

Lines changed: 11 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3023,15 +3023,15 @@ class TransactionStrategy(AbstractStrategy):
30233023
NO_SLOTS_COMMANDS = {"UNWATCH"}
30243024
IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
30253025
UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
3026+
SLOT_REDIRECT_ERRORS = (AskError, MovedError)
3027+
CONNECTION_ERRORS = (ConnectionError,OSError,ClusterDownError)
30263028

30273029
def __init__(self, pipe: ClusterPipeline):
30283030
super().__init__(pipe)
30293031
self._explicit_transaction = False
30303032
self._watching = False
30313033
self._pipeline_slots: Set[int] = set()
30323034
self._transaction_connection: Optional[Connection] = None
3033-
self._cluster_error = False
3034-
self._slot_migrating = False
30353035
self._executing = False
30363036

30373037
def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]:
@@ -3125,28 +3125,19 @@ def _send_command_parse_response(
31253125
Send a command and parse the response
31263126
"""
31273127

3128-
self.slot_migrating = False
3129-
try:
3130-
conn.send_command(*args)
3131-
output = redis_node.parse_response(conn, command_name, **options)
3132-
3133-
except (AskError, MovedError) as slot_error:
3134-
self.slot_migrating = True
3135-
raise slot_error
3136-
except ConnectionError as conn_error:
3137-
self._cluster_error = True
3138-
raise conn_error
3128+
conn.send_command(*args)
3129+
output = redis_node.parse_response(conn, command_name, **options)
31393130

31403131
if command_name in self.UNWATCH_COMMANDS:
31413132
self._watching = False
31423133
return output
31433134

31443135
def _reinitialize_on_error(self, error):
31453136
if self._watching:
3146-
if self.slot_migrating and self._executing:
3137+
if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing:
31473138
raise WatchError("Slot rebalancing occurred while watching keys")
31483139

3149-
if self.slot_migrating or self._cluster_error:
3140+
if type(error) in self.SLOT_REDIRECT_ERRORS or type(error) in self.CONNECTION_ERRORS:
31503141
if self._transaction_connection:
31513142
self._transaction_connection = None
31523143

@@ -3157,8 +3148,6 @@ def _reinitialize_on_error(self, error):
31573148
else:
31583149
self._nodes_manager.update_moved_exception(error)
31593150

3160-
self.slot_migrating = False
3161-
self._cluster_error = False
31623151
self._executing = False
31633152

31643153
def _raise_first_error(self, responses, stack):
@@ -3196,8 +3185,6 @@ def _execute_transaction(
31963185
)
31973186

31983187
self._executing = True
3199-
self.slot_migrating = False
3200-
self._cluster_error = False
32013188

32023189
redis_node, connection = self._get_client_and_connection_for_transaction()
32033190

@@ -3220,8 +3207,7 @@ def _execute_transaction(
32203207
except ResponseError as e:
32213208
self.annotate_exception(e, 0, "MULTI")
32223209
errors.append(e)
3223-
except (ClusterDownError, ConnectionError) as cluster_error:
3224-
self._cluster_error = True
3210+
except self.CONNECTION_ERRORS as cluster_error:
32253211
self.annotate_exception(cluster_error, 0, "MULTI")
32263212
raise
32273213

@@ -3232,12 +3218,10 @@ def _execute_transaction(
32323218
else:
32333219
try:
32343220
_ = redis_node.parse_response(connection, "_")
3235-
except (AskError, MovedError) as slot_error:
3236-
self.slot_migrating = True
3221+
except self.SLOT_REDIRECT_ERRORS as slot_error:
32373222
self.annotate_exception(slot_error, i + 1, command.args)
32383223
errors.append(slot_error)
3239-
except (ClusterDownError, ConnectionError) as cluster_error:
3240-
self._cluster_error = True
3224+
except self.CONNECTION_ERRORS as cluster_error:
32413225
self.annotate_exception(cluster_error, i + 1, command.args)
32423226
raise
32433227
except ResponseError as e:
@@ -3274,7 +3258,7 @@ def _execute_transaction(
32743258
)
32753259

32763260
# find any errors in the response and raise if necessary
3277-
if raise_on_error or self.slot_migrating:
3261+
if raise_on_error or len(errors) > 0:
32783262
self._raise_first_error(
32793263
response,
32803264
self._command_queue,
@@ -3312,7 +3296,7 @@ def reset(self):
33123296
self._transaction_connection
33133297
)
33143298
self._transaction_connection = None
3315-
except ConnectionError:
3299+
except self.CONNECTION_ERRORS:
33163300
# disconnect will also remove any previous WATCHes
33173301
if self._transaction_connection:
33183302
self._transaction_connection.disconnect()
@@ -3321,8 +3305,6 @@ def reset(self):
33213305
self._watching = False
33223306
self._explicit_transaction = False
33233307
self._pipeline_slots = set()
3324-
self._slot_migrating = False
3325-
self._cluster_error = False
33263308
self._executing = False
33273309

33283310
def send_cluster_commands(

0 commit comments

Comments
 (0)