Skip to content

Commit ca9a457

Browse files
committed
Merge branch 'master' into 8387-fix-log-endpoint-in-api-server
2 parents a85d544 + 3ba41bc commit ca9a457

File tree

8 files changed

+242
-186
lines changed

8 files changed

+242
-186
lines changed

packages/service-library/src/servicelib/redis/_semaphore_decorator.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ async def _managed_semaphore_execution(
3838
semaphore_key: str,
3939
ttl: datetime.timedelta,
4040
execution_context: str,
41+
expected_lock_overall_time: datetime.timedelta,
4142
) -> AsyncIterator:
4243
"""Common semaphore management logic with auto-renewal."""
4344
# Acquire the semaphore first
@@ -106,14 +107,14 @@ async def _periodic_renewer() -> None:
106107
finally:
107108
lock_release_time = arrow.utcnow()
108109
locking_time = lock_release_time - lock_acquisition_time
109-
if locking_time > DEFAULT_EXPECTED_LOCK_OVERALL_TIME:
110+
if locking_time > expected_lock_overall_time:
110111
_logger.warning(
111112
"Semaphore '%s' was held for %s which is longer than expected (%s). "
112113
"TIP: consider reducing the locking time by optimizing the code inside "
113114
"the critical section or increasing the default locking time",
114115
semaphore_key,
115116
locking_time,
116-
DEFAULT_EXPECTED_LOCK_OVERALL_TIME,
117+
expected_lock_overall_time,
117118
)
118119

119120

@@ -157,6 +158,7 @@ def with_limited_concurrency(
157158
ttl: datetime.timedelta = DEFAULT_SEMAPHORE_TTL,
158159
blocking: bool = True,
159160
blocking_timeout: datetime.timedelta | None = DEFAULT_SOCKET_TIMEOUT,
161+
expected_lock_overall_time: datetime.timedelta = DEFAULT_EXPECTED_LOCK_OVERALL_TIME,
160162
) -> Callable[
161163
[Callable[P, Coroutine[Any, Any, R]]], Callable[P, Coroutine[Any, Any, R]]
162164
]:
@@ -174,6 +176,7 @@ def with_limited_concurrency(
174176
ttl: Time-to-live for semaphore entries (default: 5 minutes)
175177
blocking: Whether to block when semaphore is full (default: True)
176178
blocking_timeout: Maximum time to wait when blocking (default: socket timeout)
179+
expected_lock_overall_time: helper for logging warnings if lock is held longer than expected
177180
178181
Example:
179182
@with_limited_concurrency(
@@ -209,7 +212,11 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
209212
)
210213

211214
async with _managed_semaphore_execution(
212-
semaphore, semaphore_key, ttl, f"coroutine_{coro.__name__}"
215+
semaphore,
216+
semaphore_key,
217+
ttl,
218+
f"coroutine_{coro.__name__}",
219+
expected_lock_overall_time,
213220
):
214221
return await coro(*args, **kwargs)
215222

@@ -226,6 +233,7 @@ def with_limited_concurrency_cm(
226233
ttl: datetime.timedelta = DEFAULT_SEMAPHORE_TTL,
227234
blocking: bool = True,
228235
blocking_timeout: datetime.timedelta | None = DEFAULT_SOCKET_TIMEOUT,
236+
expected_lock_overall_time: datetime.timedelta = DEFAULT_EXPECTED_LOCK_OVERALL_TIME,
229237
) -> Callable[
230238
[Callable[P, AbstractAsyncContextManager[R]]],
231239
Callable[P, AbstractAsyncContextManager[R]],
@@ -244,6 +252,7 @@ def with_limited_concurrency_cm(
244252
ttl: Time-to-live for semaphore entries (default: 5 minutes)
245253
blocking: Whether to block when semaphore is full (default: True)
246254
blocking_timeout: Maximum time to wait when blocking (default: socket timeout)
255+
expected_lock_overall_time: helper for logging warnings if lock is held longer than expected
247256
248257
Example:
249258
@asynccontextmanager
@@ -281,7 +290,11 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> AsyncIterator[R]:
281290

282291
async with (
283292
_managed_semaphore_execution(
284-
semaphore, semaphore_key, ttl, f"context_manager_{cm_func.__name__}"
293+
semaphore,
294+
semaphore_key,
295+
ttl,
296+
f"context_manager_{cm_func.__name__}",
297+
expected_lock_overall_time,
285298
),
286299
cm_func(*args, **kwargs) as value,
287300
):

packages/service-library/tests/redis/test_semaphore_decorator.py

Lines changed: 66 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import pytest
1515
from pytest_mock import MockerFixture
16+
from pytest_simcore.helpers.logging_tools import log_context
1617
from servicelib.redis import RedisClientSDK
1718
from servicelib.redis._constants import (
1819
SEMAPHORE_HOLDER_KEY_PREFIX,
@@ -275,10 +276,10 @@ async def limited_function() -> None:
275276
key=semaphore_name,
276277
capacity=1,
277278
blocking=False,
278-
blocking_timeout=datetime.timedelta(seconds=0.1),
279+
blocking_timeout=None,
279280
)
280281
async def limited_function_non_blocking() -> None:
281-
await asyncio.sleep(0.5)
282+
await asyncio.sleep(2)
282283

283284
tasks = [asyncio.create_task(limited_function_non_blocking()) for _ in range(3)]
284285
results = await asyncio.gather(*tasks, return_exceptions=True)
@@ -365,11 +366,11 @@ async def test_with_large_capacity(
365366
redis_client_sdk: RedisClientSDK,
366367
semaphore_name: str,
367368
):
368-
large_capacity = 100
369+
large_capacity = 20
369370
concurrent_count = 0
370371
max_concurrent = 0
371-
sleep_time_s = 5
372-
num_tasks = 1000
372+
sleep_time_s = 10
373+
num_tasks = 500
373374

374375
@with_limited_concurrency(
375376
redis_client_sdk,
@@ -382,9 +383,8 @@ async def limited_function() -> None:
382383
nonlocal concurrent_count, max_concurrent
383384
concurrent_count += 1
384385
max_concurrent = max(max_concurrent, concurrent_count)
385-
logging.info("Started task, current concurrent: %d", concurrent_count)
386-
await asyncio.sleep(sleep_time_s)
387-
logging.info("Done task, current concurrent: %d", concurrent_count)
386+
with log_context(logging.INFO, f"task with {concurrent_count=}"):
387+
await asyncio.sleep(sleep_time_s)
388388
concurrent_count -= 1
389389

390390
# Start tasks equal to the large capacity
@@ -400,6 +400,63 @@ async def limited_function() -> None:
400400
assert max_concurrent <= large_capacity
401401

402402

403+
async def test_long_locking_logs_warning(
404+
redis_client_sdk: RedisClientSDK,
405+
semaphore_name: str,
406+
caplog: pytest.LogCaptureFixture,
407+
mocker: MockerFixture,
408+
):
409+
@with_limited_concurrency(
410+
redis_client_sdk,
411+
key=semaphore_name,
412+
capacity=1,
413+
blocking=True,
414+
blocking_timeout=None,
415+
expected_lock_overall_time=datetime.timedelta(milliseconds=200),
416+
)
417+
async def limited_function() -> None:
418+
with log_context(logging.INFO, "task"):
419+
await asyncio.sleep(0.4)
420+
421+
with caplog.at_level(logging.WARNING):
422+
await limited_function()
423+
assert caplog.records
424+
assert "longer than expected" in caplog.messages[-1]
425+
426+
427+
@pytest.mark.skip
428+
async def test_semaphore_fair_queuing(
429+
redis_client_sdk: RedisClientSDK,
430+
semaphore_name: str,
431+
):
432+
entered_order: list[int] = []
433+
434+
@with_limited_concurrency(
435+
redis_client_sdk,
436+
key=semaphore_name,
437+
capacity=1,
438+
)
439+
async def limited_function(call_id: int):
440+
entered_order.append(call_id)
441+
await asyncio.sleep(0.1)
442+
return call_id
443+
444+
# Launch tasks in a specific order
445+
num_tasks = 10
446+
tasks = []
447+
for i in range(num_tasks):
448+
tasks.append(asyncio.create_task(limited_function(i)))
449+
await asyncio.sleep(0.01) # Small delay to help preserve order
450+
results = await asyncio.gather(*tasks)
451+
452+
# All should complete successfully and in order
453+
assert results == list(range(num_tasks))
454+
# The order in which they entered the critical section should match the order of submission
455+
assert entered_order == list(
456+
range(num_tasks)
457+
), f"Expected fair queuing, got {entered_order}"
458+
459+
403460
async def test_context_manager_basic_functionality(
404461
redis_client_sdk: RedisClientSDK,
405462
semaphore_name: str,
@@ -442,6 +499,7 @@ async def test_context_manager_capacity_enforcement(
442499
redis_client_sdk,
443500
key=semaphore_name,
444501
capacity=2,
502+
blocking_timeout=None,
445503
)
446504
@asynccontextmanager
447505
async def limited_context_manager():

services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_computational.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,13 @@ async def list_unrunnable_tasks(self, app: FastAPI) -> list[DaskTask]:
8888

8989
def get_task_required_resources(self, task) -> Resources:
9090
assert self # nosec
91-
return utils.resources_from_dask_task(task)
91+
task_required_resources = utils.resources_from_dask_task(task)
92+
# ensure cpu is set at least to 1 as dask-workers use 1 thread per CPU
93+
if task_required_resources.cpus < 1.0:
94+
task_required_resources = task_required_resources.model_copy(
95+
update={"cpus": 1.0}
96+
)
97+
return task_required_resources
9298

9399
async def get_task_defined_instance(
94100
self, app: FastAPI, task

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py

Lines changed: 33 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from servicelib.logging_utils import log_catch, log_context
3636
from servicelib.rabbitmq import RabbitMQClient, RabbitMQRPCClient
3737
from servicelib.redis import RedisClientSDK
38+
from servicelib.utils import limited_gather
3839
from sqlalchemy.ext.asyncio import AsyncEngine
3940

4041
from ...constants import UNDEFINED_STR_METADATA
@@ -79,6 +80,7 @@
7980
_MAX_WAITING_TIME_FOR_UNKNOWN_TASKS: Final[datetime.timedelta] = datetime.timedelta(
8081
seconds=30
8182
)
83+
_PUBLICATION_CONCURRENCY_LIMIT: Final[int] = 10
8284

8385

8486
def _auto_schedule_callback(
@@ -336,7 +338,7 @@ def _need_heartbeat(task: CompTaskAtDB) -> bool:
336338
project_id, dag
337339
)
338340
if running_tasks := [t for t in tasks.values() if _need_heartbeat(t)]:
339-
await asyncio.gather(
341+
await limited_gather(
340342
*(
341343
publish_service_resource_tracking_heartbeat(
342344
self.rabbitmq_client,
@@ -345,17 +347,15 @@ def _need_heartbeat(task: CompTaskAtDB) -> bool:
345347
),
346348
)
347349
for t in running_tasks
348-
)
350+
),
351+
log=_logger,
352+
limit=_PUBLICATION_CONCURRENCY_LIMIT,
349353
)
350-
comp_tasks_repo = CompTasksRepository(self.db_engine)
351-
await asyncio.gather(
352-
*(
353-
comp_tasks_repo.update_project_task_last_heartbeat(
354-
t.project_id, t.node_id, run_id, utc_now
355-
)
356-
for t in running_tasks
354+
comp_tasks_repo = CompTasksRepository.instance(self.db_engine)
355+
for task in running_tasks:
356+
await comp_tasks_repo.update_project_task_last_heartbeat(
357+
project_id, task.node_id, run_id, utc_now
357358
)
358-
)
359359

360360
async def _get_changed_tasks_from_backend(
361361
self,
@@ -400,7 +400,7 @@ async def _process_started_tasks(
400400
utc_now = arrow.utcnow().datetime
401401

402402
# resource tracking
403-
await asyncio.gather(
403+
await limited_gather(
404404
*(
405405
publish_service_resource_tracking_started(
406406
self.rabbitmq_client,
@@ -462,10 +462,12 @@ async def _process_started_tasks(
462462
service_additional_metadata={},
463463
)
464464
for t in tasks
465-
)
465+
),
466+
log=_logger,
467+
limit=_PUBLICATION_CONCURRENCY_LIMIT,
466468
)
467469
# instrumentation
468-
await asyncio.gather(
470+
await limited_gather(
469471
*(
470472
publish_service_started_metrics(
471473
self.rabbitmq_client,
@@ -476,24 +478,22 @@ async def _process_started_tasks(
476478
task=t,
477479
)
478480
for t in tasks
479-
)
481+
),
482+
log=_logger,
483+
limit=_PUBLICATION_CONCURRENCY_LIMIT,
480484
)
481485

482486
# update DB
483487
comp_tasks_repo = CompTasksRepository(self.db_engine)
484-
await asyncio.gather(
485-
*(
486-
comp_tasks_repo.update_project_tasks_state(
487-
t.project_id,
488-
run_id,
489-
[t.node_id],
490-
t.state,
491-
optional_started=utc_now,
492-
optional_progress=t.progress,
493-
)
494-
for t in tasks
488+
for task in tasks:
489+
await comp_tasks_repo.update_project_tasks_state(
490+
project_id,
491+
run_id,
492+
[task.node_id],
493+
task.state,
494+
optional_started=utc_now,
495+
optional_progress=task.progress,
495496
)
496-
)
497497
await CompRunsRepository.instance(self.db_engine).mark_as_started(
498498
user_id=user_id,
499499
project_id=project_id,
@@ -504,18 +504,14 @@ async def _process_started_tasks(
504504
async def _process_waiting_tasks(
505505
self, tasks: list[TaskStateTracker], run_id: PositiveInt
506506
) -> None:
507-
comp_tasks_repo = CompTasksRepository(self.db_engine)
508-
await asyncio.gather(
509-
*(
510-
comp_tasks_repo.update_project_tasks_state(
511-
t.current.project_id,
512-
run_id,
513-
[t.current.node_id],
514-
t.current.state,
515-
)
516-
for t in tasks
507+
comp_tasks_repo = CompTasksRepository.instance(self.db_engine)
508+
for task in tasks:
509+
await comp_tasks_repo.update_project_tasks_state(
510+
task.current.project_id,
511+
run_id,
512+
[task.current.node_id],
513+
task.current.state,
517514
)
518-
)
519515

520516
async def _update_states_from_comp_backend(
521517
self,

0 commit comments

Comments
 (0)