Skip to content

Commit 35bb8f5

Browse files
authored
Merge branch 'master' into ps_remove_graph
2 parents 6ec18d1 + 09b1376 commit 35bb8f5

File tree

16 files changed

+173
-43
lines changed

16 files changed

+173
-43
lines changed

docs/advanced_features.rst

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -380,8 +380,6 @@ run_in_thread.
380380
>>> def exception_handler(ex, pubsub, thread):
381381
>>> print(ex)
382382
>>> thread.stop()
383-
>>> thread.join(timeout=1.0)
384-
>>> pubsub.close()
385383
>>> thread = p.run_in_thread(exception_handler=exception_handler)
386384
387385
A PubSub object adheres to the same encoding semantics as the client

pyproject.toml

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,17 @@ classifiers = [
3232
dependencies = ['async-timeout>=4.0.3; python_full_version<"3.11.3"']
3333

3434
[project.optional-dependencies]
35-
hiredis = ["hiredis>=3.0.0"]
36-
ocsp = ["cryptography>=36.0.1", "pyopenssl==20.0.1", "requests>=2.31.0"]
37-
jwt = ["PyJWT~=2.9.0"]
35+
hiredis = [
36+
"hiredis>=3.0.0",
37+
]
38+
ocsp = [
39+
"cryptography>=36.0.1",
40+
"pyopenssl>=20.0.1",
41+
"requests>=2.31.0",
42+
]
43+
jwt = [
44+
"PyJWT~=2.9.0",
45+
]
3846

3947
[project.urls]
4048
Changes = "https://github.com/redis/redis-py/releases"

redis/asyncio/client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,7 @@ def lock(
478478
blocking_timeout: Optional[float] = None,
479479
lock_class: Optional[Type[Lock]] = None,
480480
thread_local: bool = True,
481+
raise_on_release_error: bool = True,
481482
) -> Lock:
482483
"""
483484
Return a new Lock object using key ``name`` that mimics
@@ -524,6 +525,11 @@ def lock(
524525
thread-1 would see the token value as "xyz" and would be
525526
able to successfully release the thread-2's lock.
526527
528+
``raise_on_release_error`` indicates whether to raise an exception when
529+
the lock is no longer owned when exiting the context manager. By default,
530+
this is True, meaning an exception will be raised. If False, the warning
531+
will be logged and the exception will be suppressed.
532+
527533
In some use cases it's necessary to disable thread local storage. For
528534
example, if you have code where one thread acquires a lock and passes
529535
that lock instance to a worker thread to release later. If thread
@@ -541,6 +547,7 @@ def lock(
541547
blocking=blocking,
542548
blocking_timeout=blocking_timeout,
543549
thread_local=thread_local,
550+
raise_on_release_error=raise_on_release_error,
544551
)
545552

546553
def pubsub(self, **kwargs) -> "PubSub":

redis/asyncio/cluster.py

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -839,6 +839,7 @@ def lock(
839839
blocking_timeout: Optional[float] = None,
840840
lock_class: Optional[Type[Lock]] = None,
841841
thread_local: bool = True,
842+
raise_on_release_error: bool = True,
842843
) -> Lock:
843844
"""
844845
Return a new Lock object using key ``name`` that mimics
@@ -885,6 +886,11 @@ def lock(
885886
thread-1 would see the token value as "xyz" and would be
886887
able to successfully release the thread-2's lock.
887888
889+
``raise_on_release_error`` indicates whether to raise an exception when
890+
the lock is no longer owned when exiting the context manager. By default,
891+
this is True, meaning an exception will be raised. If False, the warning
892+
will be logged and the exception will be suppressed.
893+
888894
In some use cases it's necessary to disable thread local storage. For
889895
example, if you have code where one thread acquires a lock and passes
890896
that lock instance to a worker thread to release later. If thread
@@ -902,6 +908,7 @@ def lock(
902908
blocking=blocking,
903909
blocking_timeout=blocking_timeout,
904910
thread_local=thread_local,
911+
raise_on_release_error=raise_on_release_error,
905912
)
906913

907914

@@ -1597,18 +1604,24 @@ async def _execute(
15971604
result.args = (msg,) + result.args[1:]
15981605
raise result
15991606

1600-
default_node = nodes.get(client.get_default_node().name)
1601-
if default_node is not None:
1602-
# This pipeline execution used the default node, check if we need
1603-
# to replace it.
1604-
# Note: when the error is raised we'll reset the default node in the
1605-
# caller function.
1606-
for cmd in default_node[1]:
1607-
# Check if it has a command that failed with a relevant
1608-
# exception
1609-
if type(cmd.result) in self.__class__.ERRORS_ALLOW_RETRY:
1610-
client.replace_default_node()
1611-
break
1607+
default_cluster_node = client.get_default_node()
1608+
1609+
# Check whether the default node was used. In some cases,
1610+
# 'client.get_default_node()' may return None. The check below
1611+
# prevents a potential AttributeError.
1612+
if default_cluster_node is not None:
1613+
default_node = nodes.get(default_cluster_node.name)
1614+
if default_node is not None:
1615+
# This pipeline execution used the default node, check if we need
1616+
# to replace it.
1617+
# Note: when the error is raised we'll reset the default node in the
1618+
# caller function.
1619+
for cmd in default_node[1]:
1620+
# Check if it has a command that failed with a relevant
1621+
# exception
1622+
if type(cmd.result) in self.__class__.ERRORS_ALLOW_RETRY:
1623+
client.replace_default_node()
1624+
break
16121625

16131626
return [cmd.result for cmd in stack]
16141627

redis/asyncio/lock.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import logging
23
import threading
34
import uuid
45
from types import SimpleNamespace
@@ -10,6 +11,8 @@
1011
if TYPE_CHECKING:
1112
from redis.asyncio import Redis, RedisCluster
1213

14+
logger = logging.getLogger(__name__)
15+
1316

1417
class Lock:
1518
"""
@@ -85,6 +88,7 @@ def __init__(
8588
blocking: bool = True,
8689
blocking_timeout: Optional[Number] = None,
8790
thread_local: bool = True,
91+
raise_on_release_error: bool = True,
8892
):
8993
"""
9094
Create a new Lock instance named ``name`` using the Redis client
@@ -128,6 +132,11 @@ def __init__(
128132
thread-1 would see the token value as "xyz" and would be
129133
able to successfully release the thread-2's lock.
130134
135+
``raise_on_release_error`` indicates whether to raise an exception when
136+
the lock is no longer owned when exiting the context manager. By default,
137+
this is True, meaning an exception will be raised. If False, the warning
138+
will be logged and the exception will be suppressed.
139+
131140
In some use cases it's necessary to disable thread local storage. For
132141
example, if you have code where one thread acquires a lock and passes
133142
that lock instance to a worker thread to release later. If thread
@@ -144,6 +153,7 @@ def __init__(
144153
self.blocking_timeout = blocking_timeout
145154
self.thread_local = bool(thread_local)
146155
self.local = threading.local() if self.thread_local else SimpleNamespace()
156+
self.raise_on_release_error = raise_on_release_error
147157
self.local.token = None
148158
self.register_scripts()
149159

@@ -163,7 +173,14 @@ async def __aenter__(self):
163173
raise LockError("Unable to acquire lock within the time specified")
164174

165175
async def __aexit__(self, exc_type, exc_value, traceback):
166-
await self.release()
176+
try:
177+
await self.release()
178+
except LockError:
179+
if self.raise_on_release_error:
180+
raise
181+
logger.warning(
182+
"Lock was unlocked or no longer owned when exiting context manager."
183+
)
167184

168185
async def acquire(
169186
self,

redis/client.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,7 @@ def lock(
473473
blocking_timeout: Optional[float] = None,
474474
lock_class: Union[None, Any] = None,
475475
thread_local: bool = True,
476+
raise_on_release_error: bool = True,
476477
):
477478
"""
478479
Return a new Lock object using key ``name`` that mimics
@@ -519,6 +520,11 @@ def lock(
519520
thread-1 would see the token value as "xyz" and would be
520521
able to successfully release the thread-2's lock.
521522
523+
``raise_on_release_error`` indicates whether to raise an exception when
524+
the lock is no longer owned when exiting the context manager. By default,
525+
this is True, meaning an exception will be raised. If False, the warning
526+
will be logged and the exception will be suppressed.
527+
522528
In some use cases it's necessary to disable thread local storage. For
523529
example, if you have code where one thread acquires a lock and passes
524530
that lock instance to a worker thread to release later. If thread
@@ -536,6 +542,7 @@ def lock(
536542
blocking=blocking,
537543
blocking_timeout=blocking_timeout,
538544
thread_local=thread_local,
545+
raise_on_release_error=raise_on_release_error,
539546
)
540547

541548
def pubsub(self, **kwargs):
@@ -950,7 +957,7 @@ def check_health(self) -> None:
950957
"did you forget to call subscribe() or psubscribe()?"
951958
)
952959

953-
if conn.health_check_interval and time.time() > conn.next_health_check:
960+
if conn.health_check_interval and time.monotonic() > conn.next_health_check:
954961
conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False)
955962
self.health_check_response_counter += 1
956963

@@ -1100,12 +1107,12 @@ def get_message(
11001107
"""
11011108
if not self.subscribed:
11021109
# Wait for subscription
1103-
start_time = time.time()
1110+
start_time = time.monotonic()
11041111
if self.subscribed_event.wait(timeout) is True:
11051112
# The connection was subscribed during the timeout time frame.
11061113
# The timeout should be adjusted based on the time spent
11071114
# waiting for the subscription
1108-
time_spent = time.time() - start_time
1115+
time_spent = time.monotonic() - start_time
11091116
timeout = max(0.0, timeout - time_spent)
11101117
else:
11111118
# The connection isn't subscribed to any channels or patterns,

redis/cluster.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -821,6 +821,7 @@ def lock(
821821
blocking_timeout=None,
822822
lock_class=None,
823823
thread_local=True,
824+
raise_on_release_error: bool = True,
824825
):
825826
"""
826827
Return a new Lock object using key ``name`` that mimics
@@ -867,6 +868,11 @@ def lock(
867868
thread-1 would see the token value as "xyz" and would be
868869
able to successfully release the thread-2's lock.
869870
871+
``raise_on_release_error`` indicates whether to raise an exception when
872+
the lock is no longer owned when exiting the context manager. By default,
873+
this is True, meaning an exception will be raised. If False, the warning
874+
will be logged and the exception will be suppressed.
875+
870876
In some use cases it's necessary to disable thread local storage. For
871877
example, if you have code where one thread acquires a lock and passes
872878
that lock instance to a worker thread to release later. If thread
@@ -884,6 +890,7 @@ def lock(
884890
blocking=blocking,
885891
blocking_timeout=blocking_timeout,
886892
thread_local=thread_local,
893+
raise_on_release_error=raise_on_release_error,
887894
)
888895

889896
def set_response_callback(self, command, callback):

redis/commands/search/commands.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,7 @@ def search(
500500
For more information see `FT.SEARCH <https://redis.io/commands/ft.search>`_.
501501
""" # noqa
502502
args, query = self._mk_query_args(query, query_params=query_params)
503-
st = time.time()
503+
st = time.monotonic()
504504

505505
options = {}
506506
if get_protocol_version(self.client) not in ["3", 3]:
@@ -512,7 +512,7 @@ def search(
512512
return res
513513

514514
return self._parse_results(
515-
SEARCH_CMD, res, query=query, duration=(time.time() - st) * 1000.0
515+
SEARCH_CMD, res, query=query, duration=(time.monotonic() - st) * 1000.0
516516
)
517517

518518
def explain(
@@ -602,7 +602,7 @@ def profile(
602602
Each parameter has a name and a value.
603603
604604
"""
605-
st = time.time()
605+
st = time.monotonic()
606606
cmd = [PROFILE_CMD, self.index_name, ""]
607607
if limited:
608608
cmd.append("LIMITED")
@@ -621,7 +621,7 @@ def profile(
621621
res = self.execute_command(*cmd)
622622

623623
return self._parse_results(
624-
PROFILE_CMD, res, query=query, duration=(time.time() - st) * 1000.0
624+
PROFILE_CMD, res, query=query, duration=(time.monotonic() - st) * 1000.0
625625
)
626626

627627
def spellcheck(self, query, distance=None, include=None, exclude=None):
@@ -940,7 +940,7 @@ async def search(
940940
For more information see `FT.SEARCH <https://redis.io/commands/ft.search>`_.
941941
""" # noqa
942942
args, query = self._mk_query_args(query, query_params=query_params)
943-
st = time.time()
943+
st = time.monotonic()
944944

945945
options = {}
946946
if get_protocol_version(self.client) not in ["3", 3]:
@@ -952,7 +952,7 @@ async def search(
952952
return res
953953

954954
return self._parse_results(
955-
SEARCH_CMD, res, query=query, duration=(time.time() - st) * 1000.0
955+
SEARCH_CMD, res, query=query, duration=(time.monotonic() - st) * 1000.0
956956
)
957957

958958
async def aggregate(

redis/connection.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44
import ssl
55
import sys
66
import threading
7+
import time
78
import weakref
89
from abc import abstractmethod
910
from itertools import chain
1011
from queue import Empty, Full, LifoQueue
11-
from time import time
1212
from typing import Any, Callable, Dict, List, Optional, Type, TypeVar, Union
1313
from urllib.parse import parse_qs, unquote, urlparse
1414

@@ -542,7 +542,7 @@ def _ping_failed(self, error):
542542

543543
def check_health(self):
544544
"""Check the health of the connection with a PING/PONG"""
545-
if self.health_check_interval and time() > self.next_health_check:
545+
if self.health_check_interval and time.monotonic() > self.next_health_check:
546546
self.retry.call_with_retry(self._send_ping, self._ping_failed)
547547

548548
def send_packed_command(self, command, check_health=True):
@@ -632,7 +632,7 @@ def read_response(
632632
raise
633633

634634
if self.health_check_interval:
635-
self.next_health_check = time() + self.health_check_interval
635+
self.next_health_check = time.monotonic() + self.health_check_interval
636636

637637
if isinstance(response, ResponseError):
638638
try:

redis/exceptions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ def __init__(self, message=None, lock_name=None):
8989

9090

9191
class LockNotOwnedError(LockError):
92-
"Error trying to extend or release a lock that is (no longer) owned"
92+
"Error trying to extend or release a lock that is not owned (anymore)"
9393

9494
pass
9595

0 commit comments

Comments
 (0)