@@ -38,6 +38,7 @@ async def _managed_semaphore_execution(
3838 semaphore_key : str ,
3939 ttl : datetime .timedelta ,
4040 execution_context : str ,
41+ expected_lock_overall_time : datetime .timedelta ,
4142) -> AsyncIterator :
4243 """Common semaphore management logic with auto-renewal."""
4344 # Acquire the semaphore first
@@ -106,14 +107,14 @@ async def _periodic_renewer() -> None:
106107 finally :
107108 lock_release_time = arrow .utcnow ()
108109 locking_time = lock_release_time - lock_acquisition_time
109- if locking_time > DEFAULT_EXPECTED_LOCK_OVERALL_TIME :
110+ if locking_time > expected_lock_overall_time :
110111 _logger .warning (
111112 "Semaphore '%s' was held for %s which is longer than expected (%s). "
112113 "TIP: consider reducing the locking time by optimizing the code inside "
113114 "the critical section or increasing the default locking time" ,
114115 semaphore_key ,
115116 locking_time ,
116- DEFAULT_EXPECTED_LOCK_OVERALL_TIME ,
117+ expected_lock_overall_time ,
117118 )
118119
119120
@@ -157,6 +158,7 @@ def with_limited_concurrency(
157158 ttl : datetime .timedelta = DEFAULT_SEMAPHORE_TTL ,
158159 blocking : bool = True ,
159160 blocking_timeout : datetime .timedelta | None = DEFAULT_SOCKET_TIMEOUT ,
161+ expected_lock_overall_time : datetime .timedelta = DEFAULT_EXPECTED_LOCK_OVERALL_TIME ,
160162) -> Callable [
161163 [Callable [P , Coroutine [Any , Any , R ]]], Callable [P , Coroutine [Any , Any , R ]]
162164]:
@@ -174,6 +176,7 @@ def with_limited_concurrency(
174176 ttl: Time-to-live for semaphore entries (default: 5 minutes)
175177 blocking: Whether to block when semaphore is full (default: True)
176178 blocking_timeout: Maximum time to wait when blocking (default: socket timeout)
179+ expected_lock_overall_time: helper for logging warnings if lock is held longer than expected
177180
178181 Example:
179182 @with_limited_concurrency(
@@ -209,7 +212,11 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
209212 )
210213
211214 async with _managed_semaphore_execution (
212- semaphore , semaphore_key , ttl , f"coroutine_{ coro .__name__ } "
215+ semaphore ,
216+ semaphore_key ,
217+ ttl ,
218+ f"coroutine_{ coro .__name__ } " ,
219+ expected_lock_overall_time ,
213220 ):
214221 return await coro (* args , ** kwargs )
215222
@@ -226,6 +233,7 @@ def with_limited_concurrency_cm(
226233 ttl : datetime .timedelta = DEFAULT_SEMAPHORE_TTL ,
227234 blocking : bool = True ,
228235 blocking_timeout : datetime .timedelta | None = DEFAULT_SOCKET_TIMEOUT ,
236+ expected_lock_overall_time : datetime .timedelta = DEFAULT_EXPECTED_LOCK_OVERALL_TIME ,
229237) -> Callable [
230238 [Callable [P , AbstractAsyncContextManager [R ]]],
231239 Callable [P , AbstractAsyncContextManager [R ]],
@@ -244,6 +252,7 @@ def with_limited_concurrency_cm(
244252 ttl: Time-to-live for semaphore entries (default: 5 minutes)
245253 blocking: Whether to block when semaphore is full (default: True)
246254 blocking_timeout: Maximum time to wait when blocking (default: socket timeout)
255+ expected_lock_overall_time: helper for logging warnings if lock is held longer than expected
247256
248257 Example:
249258 @asynccontextmanager
@@ -281,7 +290,11 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> AsyncIterator[R]:
281290
282291 async with (
283292 _managed_semaphore_execution (
284- semaphore , semaphore_key , ttl , f"context_manager_{ cm_func .__name__ } "
293+ semaphore ,
294+ semaphore_key ,
295+ ttl ,
296+ f"context_manager_{ cm_func .__name__ } " ,
297+ expected_lock_overall_time ,
285298 ),
286299 cm_func (* args , ** kwargs ) as value ,
287300 ):
0 commit comments