Skip to content

Commit 08961c6

Browse files
authored
Merge branch 'master' into fixes/rtc
2 parents 720a9cf + 3ba41bc commit 08961c6

File tree

4 files changed

+98
-14
lines changed

4 files changed

+98
-14
lines changed

packages/service-library/src/servicelib/redis/_semaphore_decorator.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ async def _managed_semaphore_execution(
3838
semaphore_key: str,
3939
ttl: datetime.timedelta,
4040
execution_context: str,
41+
expected_lock_overall_time: datetime.timedelta,
4142
) -> AsyncIterator:
4243
"""Common semaphore management logic with auto-renewal."""
4344
# Acquire the semaphore first
@@ -106,14 +107,14 @@ async def _periodic_renewer() -> None:
106107
finally:
107108
lock_release_time = arrow.utcnow()
108109
locking_time = lock_release_time - lock_acquisition_time
109-
if locking_time > DEFAULT_EXPECTED_LOCK_OVERALL_TIME:
110+
if locking_time > expected_lock_overall_time:
110111
_logger.warning(
111112
"Semaphore '%s' was held for %s which is longer than expected (%s). "
112113
"TIP: consider reducing the locking time by optimizing the code inside "
113114
"the critical section or increasing the default locking time",
114115
semaphore_key,
115116
locking_time,
116-
DEFAULT_EXPECTED_LOCK_OVERALL_TIME,
117+
expected_lock_overall_time,
117118
)
118119

119120

@@ -157,6 +158,7 @@ def with_limited_concurrency(
157158
ttl: datetime.timedelta = DEFAULT_SEMAPHORE_TTL,
158159
blocking: bool = True,
159160
blocking_timeout: datetime.timedelta | None = DEFAULT_SOCKET_TIMEOUT,
161+
expected_lock_overall_time: datetime.timedelta = DEFAULT_EXPECTED_LOCK_OVERALL_TIME,
160162
) -> Callable[
161163
[Callable[P, Coroutine[Any, Any, R]]], Callable[P, Coroutine[Any, Any, R]]
162164
]:
@@ -174,6 +176,7 @@ def with_limited_concurrency(
174176
ttl: Time-to-live for semaphore entries (default: 5 minutes)
175177
blocking: Whether to block when semaphore is full (default: True)
176178
blocking_timeout: Maximum time to wait when blocking (default: socket timeout)
179+
expected_lock_overall_time: helper for logging warnings if lock is held longer than expected
177180
178181
Example:
179182
@with_limited_concurrency(
@@ -209,7 +212,11 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
209212
)
210213

211214
async with _managed_semaphore_execution(
212-
semaphore, semaphore_key, ttl, f"coroutine_{coro.__name__}"
215+
semaphore,
216+
semaphore_key,
217+
ttl,
218+
f"coroutine_{coro.__name__}",
219+
expected_lock_overall_time,
213220
):
214221
return await coro(*args, **kwargs)
215222

@@ -226,6 +233,7 @@ def with_limited_concurrency_cm(
226233
ttl: datetime.timedelta = DEFAULT_SEMAPHORE_TTL,
227234
blocking: bool = True,
228235
blocking_timeout: datetime.timedelta | None = DEFAULT_SOCKET_TIMEOUT,
236+
expected_lock_overall_time: datetime.timedelta = DEFAULT_EXPECTED_LOCK_OVERALL_TIME,
229237
) -> Callable[
230238
[Callable[P, AbstractAsyncContextManager[R]]],
231239
Callable[P, AbstractAsyncContextManager[R]],
@@ -244,6 +252,7 @@ def with_limited_concurrency_cm(
244252
ttl: Time-to-live for semaphore entries (default: 5 minutes)
245253
blocking: Whether to block when semaphore is full (default: True)
246254
blocking_timeout: Maximum time to wait when blocking (default: socket timeout)
255+
expected_lock_overall_time: helper for logging warnings if lock is held longer than expected
247256
248257
Example:
249258
@asynccontextmanager
@@ -281,7 +290,11 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> AsyncIterator[R]:
281290

282291
async with (
283292
_managed_semaphore_execution(
284-
semaphore, semaphore_key, ttl, f"context_manager_{cm_func.__name__}"
293+
semaphore,
294+
semaphore_key,
295+
ttl,
296+
f"context_manager_{cm_func.__name__}",
297+
expected_lock_overall_time,
285298
),
286299
cm_func(*args, **kwargs) as value,
287300
):

packages/service-library/tests/redis/test_semaphore_decorator.py

Lines changed: 66 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import pytest
1515
from pytest_mock import MockerFixture
16+
from pytest_simcore.helpers.logging_tools import log_context
1617
from servicelib.redis import RedisClientSDK
1718
from servicelib.redis._constants import (
1819
SEMAPHORE_HOLDER_KEY_PREFIX,
@@ -275,10 +276,10 @@ async def limited_function() -> None:
275276
key=semaphore_name,
276277
capacity=1,
277278
blocking=False,
278-
blocking_timeout=datetime.timedelta(seconds=0.1),
279+
blocking_timeout=None,
279280
)
280281
async def limited_function_non_blocking() -> None:
281-
await asyncio.sleep(0.5)
282+
await asyncio.sleep(2)
282283

283284
tasks = [asyncio.create_task(limited_function_non_blocking()) for _ in range(3)]
284285
results = await asyncio.gather(*tasks, return_exceptions=True)
@@ -365,11 +366,11 @@ async def test_with_large_capacity(
365366
redis_client_sdk: RedisClientSDK,
366367
semaphore_name: str,
367368
):
368-
large_capacity = 100
369+
large_capacity = 20
369370
concurrent_count = 0
370371
max_concurrent = 0
371-
sleep_time_s = 5
372-
num_tasks = 1000
372+
sleep_time_s = 10
373+
num_tasks = 500
373374

374375
@with_limited_concurrency(
375376
redis_client_sdk,
@@ -382,9 +383,8 @@ async def limited_function() -> None:
382383
nonlocal concurrent_count, max_concurrent
383384
concurrent_count += 1
384385
max_concurrent = max(max_concurrent, concurrent_count)
385-
logging.info("Started task, current concurrent: %d", concurrent_count)
386-
await asyncio.sleep(sleep_time_s)
387-
logging.info("Done task, current concurrent: %d", concurrent_count)
386+
with log_context(logging.INFO, f"task with {concurrent_count=}"):
387+
await asyncio.sleep(sleep_time_s)
388388
concurrent_count -= 1
389389

390390
# Start tasks equal to the large capacity
@@ -400,6 +400,63 @@ async def limited_function() -> None:
400400
assert max_concurrent <= large_capacity
401401

402402

403+
async def test_long_locking_logs_warning(
404+
redis_client_sdk: RedisClientSDK,
405+
semaphore_name: str,
406+
caplog: pytest.LogCaptureFixture,
407+
mocker: MockerFixture,
408+
):
409+
@with_limited_concurrency(
410+
redis_client_sdk,
411+
key=semaphore_name,
412+
capacity=1,
413+
blocking=True,
414+
blocking_timeout=None,
415+
expected_lock_overall_time=datetime.timedelta(milliseconds=200),
416+
)
417+
async def limited_function() -> None:
418+
with log_context(logging.INFO, "task"):
419+
await asyncio.sleep(0.4)
420+
421+
with caplog.at_level(logging.WARNING):
422+
await limited_function()
423+
assert caplog.records
424+
assert "longer than expected" in caplog.messages[-1]
425+
426+
427+
@pytest.mark.skip
428+
async def test_semaphore_fair_queuing(
429+
redis_client_sdk: RedisClientSDK,
430+
semaphore_name: str,
431+
):
432+
entered_order: list[int] = []
433+
434+
@with_limited_concurrency(
435+
redis_client_sdk,
436+
key=semaphore_name,
437+
capacity=1,
438+
)
439+
async def limited_function(call_id: int):
440+
entered_order.append(call_id)
441+
await asyncio.sleep(0.1)
442+
return call_id
443+
444+
# Launch tasks in a specific order
445+
num_tasks = 10
446+
tasks = []
447+
for i in range(num_tasks):
448+
tasks.append(asyncio.create_task(limited_function(i)))
449+
await asyncio.sleep(0.01) # Small delay to help preserve order
450+
results = await asyncio.gather(*tasks)
451+
452+
# All should complete successfully and in order
453+
assert results == list(range(num_tasks))
454+
# The order in which they entered the critical section should match the order of submission
455+
assert entered_order == list(
456+
range(num_tasks)
457+
), f"Expected fair queuing, got {entered_order}"
458+
459+
403460
async def test_context_manager_basic_functionality(
404461
redis_client_sdk: RedisClientSDK,
405462
semaphore_name: str,
@@ -442,6 +499,7 @@ async def test_context_manager_capacity_enforcement(
442499
redis_client_sdk,
443500
key=semaphore_name,
444501
capacity=2,
502+
blocking_timeout=None,
445503
)
446504
@asynccontextmanager
447505
async def limited_context_manager():

services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_computational.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,13 @@ async def list_unrunnable_tasks(self, app: FastAPI) -> list[DaskTask]:
8888

8989
def get_task_required_resources(self, task) -> Resources:
9090
assert self # nosec
91-
return utils.resources_from_dask_task(task)
91+
task_required_resources = utils.resources_from_dask_task(task)
92+
# ensure cpu is set at least to 1 as dask-workers use 1 thread per CPU
93+
if task_required_resources.cpus < 1.0:
94+
task_required_resources = task_required_resources.model_copy(
95+
update={"cpus": 1.0}
96+
)
97+
return task_required_resources
9298

9399
async def get_task_defined_instance(
94100
self, app: FastAPI, task

services/director-v2/src/simcore_service_director_v2/modules/dask_client.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from typing import Final, cast
1616

1717
import distributed
18+
import distributed.client
1819
from aiohttp import ClientResponseError
1920
from common_library.json_serialization import json_dumps
2021
from common_library.logging.logging_errors import create_troubleshooting_log_kwargs
@@ -68,6 +69,7 @@
6869

6970
from ..core.errors import (
7071
ComputationalBackendNoS3AccessError,
72+
ComputationalBackendNotConnectedError,
7173
ComputationalBackendTaskNotFoundError,
7274
ComputationalBackendTaskResultsNotReadyError,
7375
TaskSchedulingError,
@@ -92,7 +94,7 @@
9294

9395

9496
_UserCallbackInSepThread = Callable[[], None]
95-
_MAX_CONCURRENT_CLIENT_CONNECTIONS: Final[int] = 10
97+
_MAX_CONCURRENT_CLIENT_CONNECTIONS: Final[int] = 1
9698

9799

98100
@dataclass(frozen=True, kw_only=True, slots=True)
@@ -552,6 +554,11 @@ async def get_task_result(self, job_id: str) -> TaskOutputData:
552554
raise ComputationalBackendTaskNotFoundError(job_id=job_id) from exc
553555
except distributed.TimeoutError as exc:
554556
raise ComputationalBackendTaskResultsNotReadyError(job_id=job_id) from exc
557+
except (
558+
distributed.client.FutureCancelledError,
559+
distributed.client.FuturesCancelledError,
560+
) as exc:
561+
raise ComputationalBackendNotConnectedError from exc
555562

556563
async def release_task_result(self, job_id: str) -> None:
557564
_logger.debug("releasing results for %s", f"{job_id=}")

0 commit comments

Comments
 (0)