|
1 | 1 | import asyncio |
2 | | -import contextlib |
3 | 2 | import datetime |
4 | 3 | import logging |
5 | 4 | from asyncio import Task |
6 | | -from collections.abc import AsyncIterator |
7 | 5 | from dataclasses import dataclass, field |
8 | 6 | from uuid import uuid4 |
9 | 7 |
|
10 | 8 | import redis.asyncio as aioredis |
11 | 9 | import redis.exceptions |
12 | | -from pydantic import NonNegativeFloat |
13 | 10 | from redis.asyncio.lock import Lock |
14 | 11 | from redis.asyncio.retry import Retry |
15 | 12 | from redis.backoff import ExponentialBackoff |
16 | 13 | from tenacity import retry |
17 | 14 | from yarl import URL |
18 | 15 |
|
19 | | -from ..background_task import periodic_task |
20 | 16 | from ..logging_utils import log_catch |
21 | 17 | from ..retry_policies import RedisRetryPolicyUponInitialization |
22 | 18 | from ._constants import ( |
|
25 | 21 | DEFAULT_LOCK_TTL, |
26 | 22 | DEFAULT_SOCKET_TIMEOUT, |
27 | 23 | ) |
28 | | -from ._errors import CouldNotAcquireLockError, CouldNotConnectToRedisError |
29 | | -from ._utils import auto_extend_lock, cancel_or_warn |
| 24 | +from ._errors import CouldNotConnectToRedisError |
| 25 | +from ._utils import cancel_or_warn |
30 | 26 |
|
31 | 27 | _logger = logging.getLogger(__name__) |
32 | 28 |
|
@@ -117,77 +113,6 @@ def is_healthy(self) -> bool: |
117 | 113 | """ |
118 | 114 | return self._is_healthy |
119 | 115 |
|
120 | | - @contextlib.asynccontextmanager |
121 | | - async def lock_context( |
122 | | - self, |
123 | | - lock_key: str, |
124 | | - lock_value: bytes | str | None = None, |
125 | | - *, |
126 | | - blocking: bool = False, |
127 | | - blocking_timeout_s: NonNegativeFloat = 5, |
128 | | - ) -> AsyncIterator[Lock]: |
129 | | - """Tries to acquire a lock. |
130 | | -
|
131 | | - :param lock_key: unique name of the lock |
132 | | - :param lock_value: content of the lock, defaults to None |
133 | | - :param blocking: should block here while acquiring the lock, defaults to False |
134 | | - :param blocking_timeout_s: time to wait while acquire a lock before giving up, defaults to 5 |
135 | | -
|
136 | | - :raises CouldNotAcquireLockError: reasons why lock acquisition fails: |
137 | | - 1. `blocking==False` the lock was already acquired by some other entity |
138 | | - 2. `blocking==True` timeouts out while waiting for lock to be free (another entity holds the lock) |
139 | | - """ |
140 | | - |
141 | | - total_lock_duration: datetime.timedelta = DEFAULT_LOCK_TTL |
142 | | - lock_unique_id = f"lock_extender_{lock_key}_{uuid4()}" |
143 | | - |
144 | | - ttl_lock: Lock = self._client.lock( |
145 | | - name=lock_key, |
146 | | - timeout=total_lock_duration.total_seconds(), |
147 | | - blocking=blocking, |
148 | | - blocking_timeout=blocking_timeout_s, |
149 | | - ) |
150 | | - |
151 | | - if not await ttl_lock.acquire(token=lock_value): |
152 | | - raise CouldNotAcquireLockError(lock=ttl_lock) |
153 | | - |
154 | | - try: |
155 | | - async with periodic_task( |
156 | | - auto_extend_lock, |
157 | | - interval=total_lock_duration / 2, |
158 | | - task_name=lock_unique_id, |
159 | | - lock=ttl_lock, |
160 | | - stop_timeout=0.1, |
161 | | - ): |
162 | | - # lock is in use now |
163 | | - yield ttl_lock |
164 | | - finally: |
165 | | - # NOTE Why is this error suppressed? Given the following situation: |
166 | | - # - 250 locks are acquired in parallel with the option `blocking=True`, |
167 | | - # meaning: it will wait for the lock to be free before acquiring it |
168 | | - # - when the lock is acquired the `_extend_lock` task is started |
169 | | - # in the background, extending the lock at a fixed interval of time, |
170 | | - # which is half of the duration of the lock's TTL. |
171 | | - # - before the task is released the lock extension task is cancelled |
172 | | - # Here is where the issue occurs: |
173 | | - # - some time passes between the task's cancellation and |
174 | | - # the call to release the lock |
175 | | - # - if the TTL is too small, 1/2 of the TTL might be just shorter than |
176 | | - # the time it passes to between the task is canceled and the task lock is released |
177 | | - # - this means that the lock will expire and be considered as not owned any longer |
178 | | - # For example: in one of the failing tests the TTL is set to `0.25` seconds, |
179 | | - # and half of that is `0.125` seconds. |
180 | | - |
181 | | - # Above implies that only one "task" `owns` and `extends` the lock at a time. |
182 | | - # The issue appears to be related some timings (being too low). |
183 | | - try: |
184 | | - await ttl_lock.release() |
185 | | - except redis.exceptions.LockNotOwnedError: |
186 | | - # if this appears outside tests it can cause issues since something might be happening |
187 | | - _logger.warning( |
188 | | - "Attention: lock is no longer owned. This is unexpected and requires investigation" |
189 | | - ) |
190 | | - |
191 | 116 | def create_lock( |
192 | 117 | self, lock_name: str, *, ttl: datetime.timedelta = DEFAULT_LOCK_TTL |
193 | 118 | ) -> Lock: |
|
0 commit comments