Skip to content

Commit 67993c5

Browse files
Merge branch 'master' into improve-celery-task-error-messages-and-improve-taskid-name
2 parents 6077fb8 + 699808f commit 67993c5

File tree

43 files changed

+5709
-95
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+5709
-95
lines changed

packages/pytest-simcore/src/pytest_simcore/helpers/docker.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class ContainerStatus(str, Enum):
3737

3838
_COLOR_ENCODING_RE = re.compile(r"\x1B\[([0-9]{1,2}(;[0-9]{1,2})?)?[mGK]")
3939
_MAX_PATH_CHAR_LEN_ALLOWED = 260
40-
_kFILENAME_TOO_LONG = 36
40+
_FILENAME_TOO_LONG = 36
4141
_NORMPATH_COUNT = 0
4242

4343

@@ -94,7 +94,7 @@ def get_service_published_port(
9494
)
9595

9696
for target_port in ports_to_look_for:
97-
target_port = int(target_port)
97+
target_port = int(target_port) # noqa: PLW2901
9898
for p in service_ports:
9999
if p["TargetPort"] == target_port:
100100
published_port = p["PublishedPort"]
@@ -158,7 +158,7 @@ def run_docker_compose_config(
158158
args = [f"{docker_compose_path}", *bash_options]
159159
print(" ".join(args))
160160

161-
process = subprocess.run(
161+
process = subprocess.run( # noqa: S603
162162
args,
163163
cwd=project_dir,
164164
capture_output=True,
@@ -189,7 +189,7 @@ def shorten_path(filename: str) -> Path:
189189
# This helper function tries to normalize the path
190190
# Another possibility would be that the path has some
191191
# problematic characters but so far we did not find any case ...
192-
global _NORMPATH_COUNT # pylint: disable=global-statement
192+
global _NORMPATH_COUNT # pylint: disable=global-statement # noqa: PLW0603
193193

194194
if len(filename) > _MAX_PATH_CHAR_LEN_ALLOWED:
195195
_NORMPATH_COUNT += 1
@@ -215,7 +215,7 @@ def safe_artifact_name(name: str) -> str:
215215
return BANNED_CHARS_FOR_ARTIFACTS.sub("_", name)
216216

217217

218-
def save_docker_infos(destination_dir: Path):
218+
def save_docker_infos(destination_dir: Path): # noqa: C901
219219
client = docker.from_env()
220220

221221
# Includes stop containers, which might be e.g. failing tasks
@@ -228,7 +228,7 @@ def save_docker_infos(destination_dir: Path):
228228
destination_dir.mkdir(parents=True, exist_ok=True)
229229

230230
except OSError as err:
231-
if err.errno == _kFILENAME_TOO_LONG:
231+
if err.errno == _FILENAME_TOO_LONG:
232232
destination_dir = shorten_path(err.filename)
233233
destination_dir.mkdir(parents=True, exist_ok=True)
234234

@@ -245,7 +245,7 @@ def save_docker_infos(destination_dir: Path):
245245
)
246246

247247
except OSError as err:
248-
if err.errno == _kFILENAME_TOO_LONG:
248+
if err.errno == _FILENAME_TOO_LONG:
249249
shorten_path(err.filename).write_text(
250250
_COLOR_ENCODING_RE.sub("", logs)
251251
)
@@ -256,12 +256,12 @@ def save_docker_infos(destination_dir: Path):
256256
json.dumps(container.attrs, indent=2)
257257
)
258258
except OSError as err:
259-
if err.errno == _kFILENAME_TOO_LONG:
259+
if err.errno == _FILENAME_TOO_LONG:
260260
shorten_path(err.filename).write_text(
261261
json.dumps(container.attrs, indent=2)
262262
)
263263

264-
except Exception as err: # pylint: disable=broad-except # noqa: PERF203
264+
except Exception as err: # pylint: disable=broad-except
265265
if container.status != ContainerStatus.created:
266266
print(
267267
f"Error while dumping {container.name=}, {container.status=}.\n\t{err=}"
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
from collections.abc import AsyncIterator, Callable
2+
from contextlib import AbstractAsyncContextManager, asynccontextmanager
3+
from typing import TYPE_CHECKING, Protocol
4+
5+
from tenacity.asyncio import AsyncRetrying
6+
from tenacity.retry import retry_if_exception_type
7+
from tenacity.stop import stop_after_delay
8+
from tenacity.wait import wait_fixed
9+
10+
if TYPE_CHECKING:
11+
from servicelib.rabbitmq import RabbitMQClient
12+
from servicelib.redis import RedisClientSDK
13+
14+
15+
class _ClientWithPingProtocol(Protocol):
16+
async def ping(self) -> bool: ...
17+
18+
19+
@asynccontextmanager
20+
async def _paused_container(
21+
paused_container: Callable[[str], AbstractAsyncContextManager[None]],
22+
container_name: str,
23+
client: _ClientWithPingProtocol,
24+
) -> AsyncIterator[None]:
25+
async with paused_container(container_name):
26+
yield
27+
28+
async for attempt in AsyncRetrying(
29+
wait=wait_fixed(0.1),
30+
stop=stop_after_delay(10),
31+
reraise=True,
32+
retry=retry_if_exception_type(AssertionError),
33+
):
34+
with attempt:
35+
assert await client.ping() is True
36+
37+
38+
@asynccontextmanager
39+
async def pause_rabbit(
40+
paused_container: Callable[[str], AbstractAsyncContextManager[None]],
41+
rabbit_client: "RabbitMQClient",
42+
) -> AsyncIterator[None]:
43+
"""
44+
Pause RabbitMQ container during the context block,
45+
ensuring it's fully down before and back up after.
46+
"""
47+
async with _paused_container(paused_container, "rabbit", rabbit_client):
48+
yield
49+
50+
51+
@asynccontextmanager
52+
async def pause_redis(
53+
paused_container: Callable[[str], AbstractAsyncContextManager[None]],
54+
redis_client: "RedisClientSDK",
55+
) -> AsyncIterator[None]:
56+
"""
57+
Pause Redis container during the context block,
58+
saving a DB snapshot first for a clean restore point.
59+
Ensures Redis is down before yielding, and back up after.
60+
"""
61+
await redis_client.redis.save()
62+
63+
async with _paused_container(paused_container, "redis", redis_client):
64+
yield

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/webserver/functions/functions_rpc_interface.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import logging
2-
from typing import Literal
2+
from typing import Annotated, Literal
33

44
from models_library.api_schemas_webserver import WEBSERVER_RPC_NAMESPACE
55
from models_library.api_schemas_webserver.functions import (
@@ -32,13 +32,15 @@
3232
from models_library.rest_ordering import OrderBy
3333
from models_library.rest_pagination import PageMetaInfoLimitOffset
3434
from models_library.users import UserID
35-
from pydantic import TypeAdapter
35+
from pydantic import PositiveInt, TypeAdapter
3636

3737
from .....logging_utils import log_decorator
3838
from .... import RabbitMQRPCClient
3939

4040
_logger = logging.getLogger(__name__)
4141

42+
_FUNCTION_RPC_TIMEOUT_SEC: Annotated[int, PositiveInt] = 30
43+
4244

4345
@log_decorator(_logger, level=logging.DEBUG)
4446
async def register_function(
@@ -54,6 +56,7 @@ async def register_function(
5456
function=function,
5557
user_id=user_id,
5658
product_name=product_name,
59+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
5760
)
5861
return TypeAdapter(RegisteredFunction).validate_python(
5962
result
@@ -74,6 +77,7 @@ async def get_function(
7477
function_id=function_id,
7578
user_id=user_id,
7679
product_name=product_name,
80+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
7781
)
7882
return TypeAdapter(RegisteredFunction).validate_python(result)
7983

@@ -92,6 +96,7 @@ async def get_function_input_schema(
9296
function_id=function_id,
9397
user_id=user_id,
9498
product_name=product_name,
99+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
95100
)
96101
return TypeAdapter(FunctionInputSchema).validate_python(result)
97102

@@ -110,6 +115,7 @@ async def get_function_output_schema(
110115
function_id=function_id,
111116
user_id=user_id,
112117
product_name=product_name,
118+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
113119
)
114120
return TypeAdapter(FunctionOutputSchema).validate_python(result)
115121

@@ -128,6 +134,7 @@ async def delete_function(
128134
function_id=function_id,
129135
user_id=user_id,
130136
product_name=product_name,
137+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
131138
)
132139
assert result is None # nosec
133140
return result
@@ -158,6 +165,7 @@ async def list_functions(
158165
filter_by_function_class=filter_by_function_class,
159166
search_by_function_title=search_by_function_title,
160167
search_by_multi_columns=search_by_multi_columns,
168+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
161169
)
162170
)
163171
return TypeAdapter(
@@ -188,6 +196,7 @@ async def list_function_jobs(
188196
filter_by_function_id=filter_by_function_id,
189197
filter_by_function_job_ids=filter_by_function_job_ids,
190198
filter_by_function_job_collection_id=filter_by_function_job_collection_id,
199+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
191200
)
192201
)
193202
return TypeAdapter(
@@ -220,6 +229,7 @@ async def list_function_jobs_with_status(
220229
filter_by_function_id=filter_by_function_id,
221230
filter_by_function_job_ids=filter_by_function_job_ids,
222231
filter_by_function_job_collection_id=filter_by_function_job_collection_id,
232+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
223233
)
224234
return TypeAdapter(
225235
tuple[
@@ -247,6 +257,7 @@ async def list_function_job_collections(
247257
filters=filters,
248258
user_id=user_id,
249259
product_name=product_name,
260+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
250261
)
251262
return TypeAdapter(
252263
tuple[list[RegisteredFunctionJobCollection], PageMetaInfoLimitOffset]
@@ -269,6 +280,7 @@ async def update_function_title(
269280
title=title,
270281
user_id=user_id,
271282
product_name=product_name,
283+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
272284
)
273285
return TypeAdapter(RegisteredFunction).validate_python(result)
274286

@@ -289,6 +301,7 @@ async def update_function_description(
289301
description=description,
290302
user_id=user_id,
291303
product_name=product_name,
304+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
292305
)
293306
return TypeAdapter(RegisteredFunction).validate_python(result)
294307

@@ -309,6 +322,7 @@ async def run_function(
309322
inputs=inputs,
310323
user_id=user_id,
311324
product_name=product_name,
325+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
312326
)
313327
return TypeAdapter(RegisteredFunctionJob).validate_python(
314328
result
@@ -329,6 +343,7 @@ async def register_function_job(
329343
function_job=function_job,
330344
user_id=user_id,
331345
product_name=product_name,
346+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
332347
)
333348
return TypeAdapter(RegisteredFunctionJob).validate_python(
334349
result
@@ -351,6 +366,7 @@ async def patch_registered_function_job(
351366
product_name=product_name,
352367
function_job_uuid=function_job_uuid,
353368
registered_function_job_patch=registered_function_job_patch,
369+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
354370
)
355371
return TypeAdapter(RegisteredFunctionJob).validate_python(
356372
result
@@ -371,6 +387,7 @@ async def get_function_job(
371387
function_job_id=function_job_id,
372388
user_id=user_id,
373389
product_name=product_name,
390+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
374391
)
375392

376393
return TypeAdapter(RegisteredFunctionJob).validate_python(result)
@@ -390,6 +407,7 @@ async def get_function_job_status(
390407
function_job_id=function_job_id,
391408
user_id=user_id,
392409
product_name=product_name,
410+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
393411
)
394412
return TypeAdapter(FunctionJobStatus).validate_python(result)
395413

@@ -408,6 +426,7 @@ async def get_function_job_outputs(
408426
function_job_id=function_job_id,
409427
user_id=user_id,
410428
product_name=product_name,
429+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
411430
)
412431
return TypeAdapter(FunctionOutputs).validate_python(result)
413432

@@ -430,6 +449,7 @@ async def update_function_job_status(
430449
user_id=user_id,
431450
product_name=product_name,
432451
check_write_permissions=check_write_permissions,
452+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
433453
)
434454
return TypeAdapter(FunctionJobStatus).validate_python(result)
435455

@@ -452,6 +472,7 @@ async def update_function_job_outputs(
452472
user_id=user_id,
453473
product_name=product_name,
454474
check_write_permissions=check_write_permissions,
475+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
455476
)
456477
return TypeAdapter(FunctionOutputs).validate_python(result)
457478

@@ -470,6 +491,7 @@ async def delete_function_job(
470491
function_job_id=function_job_id,
471492
user_id=user_id,
472493
product_name=product_name,
494+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
473495
)
474496
assert result is None # nosec
475497

@@ -490,6 +512,7 @@ async def find_cached_function_jobs(
490512
inputs=inputs,
491513
user_id=user_id,
492514
product_name=product_name,
515+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
493516
)
494517
if result is None:
495518
return None
@@ -510,6 +533,7 @@ async def register_function_job_collection(
510533
function_job_collection=function_job_collection,
511534
user_id=user_id,
512535
product_name=product_name,
536+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
513537
)
514538
return TypeAdapter(RegisteredFunctionJobCollection).validate_python(result)
515539

@@ -528,6 +552,7 @@ async def get_function_job_collection(
528552
function_job_collection_id=function_job_collection_id,
529553
user_id=user_id,
530554
product_name=product_name,
555+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
531556
)
532557
return TypeAdapter(RegisteredFunctionJobCollection).validate_python(result)
533558

@@ -546,6 +571,7 @@ async def delete_function_job_collection(
546571
function_job_collection_id=function_job_collection_id,
547572
user_id=user_id,
548573
product_name=product_name,
574+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
549575
)
550576
assert result is None # nosec
551577

@@ -564,6 +590,7 @@ async def get_function_user_permissions(
564590
function_id=function_id,
565591
user_id=user_id,
566592
product_name=product_name,
593+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
567594
)
568595
return TypeAdapter(FunctionUserAccessRights).validate_python(result)
569596

@@ -582,6 +609,7 @@ async def get_functions_user_api_access_rights(
582609
),
583610
user_id=user_id,
584611
product_name=product_name,
612+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
585613
)
586614
return TypeAdapter(FunctionUserApiAccessRights).validate_python(result)
587615

@@ -614,6 +642,7 @@ async def set_group_permissions(
614642
read=read,
615643
write=write,
616644
execute=execute,
645+
timeout_s=_FUNCTION_RPC_TIMEOUT_SEC,
617646
)
618647
return TypeAdapter(
619648
list[tuple[FunctionID | FunctionJobID, FunctionGroupAccessRights]]

0 commit comments

Comments
 (0)