Skip to content

Commit 361f8d1

Browse files
Merge branch 'master' into add-conversations
2 parents 681a93a + 396202b commit 361f8d1

File tree

45 files changed

+900
-761
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+900
-761
lines changed
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from typing import Final
2+
from uuid import uuid4
3+
4+
from models_library.projects import ProjectID
5+
from models_library.projects_nodes_io import NodeID
6+
from models_library.services_types import ServiceKey, ServiceVersion
7+
from models_library.users import UserID
8+
from pydantic import TypeAdapter
9+
10+
from ..models import DaskJobID
11+
12+
13+
def generate_dask_job_id(
14+
service_key: ServiceKey,
15+
service_version: ServiceVersion,
16+
user_id: UserID,
17+
project_id: ProjectID,
18+
node_id: NodeID,
19+
) -> DaskJobID:
20+
"""creates a dask job id:
21+
The job ID shall contain the user_id, project_id, node_id
22+
Also, it must be unique
23+
and it is shown in the Dask scheduler dashboard website
24+
"""
25+
return DaskJobID(
26+
f"{service_key}:{service_version}:userid_{user_id}:projectid_{project_id}:nodeid_{node_id}:uuid_{uuid4()}"
27+
)
28+
29+
30+
_JOB_ID_PARTS: Final[int] = 6
31+
32+
33+
def parse_dask_job_id(
34+
job_id: str,
35+
) -> tuple[ServiceKey, ServiceVersion, UserID, ProjectID, NodeID]:
36+
parts = job_id.split(":")
37+
assert len(parts) == _JOB_ID_PARTS # nosec
38+
return (
39+
parts[0],
40+
parts[1],
41+
TypeAdapter(UserID).validate_python(parts[2][len("userid_") :]),
42+
ProjectID(parts[3][len("projectid_") :]),
43+
NodeID(parts[4][len("nodeid_") :]),
44+
)
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from typing import TypeAlias
2+
3+
DaskJobID: TypeAlias = str
4+
DaskResources: TypeAlias = dict[str, int | float]
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# pylint: disable=too-many-positional-arguments
2+
# pylint:disable=redefined-outer-name
3+
# pylint:disable=too-many-arguments
4+
# pylint:disable=unused-argument
5+
# pylint:disable=unused-variable
6+
7+
import pytest
8+
from dask_task_models_library.container_tasks.utils import (
9+
generate_dask_job_id,
10+
parse_dask_job_id,
11+
)
12+
from faker import Faker
13+
from models_library.projects import ProjectID
14+
from models_library.projects_nodes_io import NodeID
15+
from models_library.services_types import ServiceKey, ServiceVersion
16+
from models_library.users import UserID
17+
from pydantic import TypeAdapter
18+
19+
20+
@pytest.fixture(
21+
params=["simcore/service/comp/some/fake/service/key", "dockerhub-style/service_key"]
22+
)
23+
def service_key(request) -> ServiceKey:
24+
return request.param
25+
26+
27+
@pytest.fixture()
28+
def service_version() -> str:
29+
return "1234.32432.2344"
30+
31+
32+
@pytest.fixture
33+
def user_id(faker: Faker) -> UserID:
34+
return TypeAdapter(UserID).validate_python(faker.pyint(min_value=1))
35+
36+
37+
@pytest.fixture
38+
def project_id(faker: Faker) -> ProjectID:
39+
return ProjectID(faker.uuid4())
40+
41+
42+
@pytest.fixture
43+
def node_id(faker: Faker) -> NodeID:
44+
return NodeID(faker.uuid4())
45+
46+
47+
def test_dask_job_id_serialization(
48+
service_key: ServiceKey,
49+
service_version: ServiceVersion,
50+
user_id: UserID,
51+
project_id: ProjectID,
52+
node_id: NodeID,
53+
):
54+
dask_job_id = generate_dask_job_id(
55+
service_key, service_version, user_id, project_id, node_id
56+
)
57+
(
58+
parsed_service_key,
59+
parsed_service_version,
60+
parsed_user_id,
61+
parsed_project_id,
62+
parsed_node_id,
63+
) = parse_dask_job_id(dask_job_id)
64+
assert service_key == parsed_service_key
65+
assert service_version == parsed_service_version
66+
assert user_id == parsed_user_id
67+
assert project_id == parsed_project_id
68+
assert node_id == parsed_node_id

packages/pytest-simcore/src/pytest_simcore/helpers/playwright.py

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,7 @@ class SocketIONodeProgressCompleteWaiter:
401401
)
402402
_last_progress_time: datetime = field(default_factory=lambda: datetime.now(tz=UTC))
403403
_received_messages: list[SocketIOEvent] = field(default_factory=list)
404+
_result: bool = False
404405

405406
def __call__(self, message: str) -> bool:
406407
# socket.io encodes messages like so
@@ -422,6 +423,7 @@ def __call__(self, message: str) -> bool:
422423
self.node_id,
423424
decoded_message.obj["service_state"],
424425
)
426+
self._result = False
425427
return True
426428
if decoded_message.name == _OSparcMessages.NODE_PROGRESS.value:
427429
node_progress_event = retrieve_node_progress_from_decoded_message(
@@ -449,13 +451,11 @@ def __call__(self, message: str) -> bool:
449451
f"{json.dumps({k: round(v, 2) for k, v in self._current_progress.items()})}",
450452
)
451453

452-
progress_completed = self.got_expected_node_progress_types() and all(
453-
round(progress, 1) == 1.0
454-
for progress in self._current_progress.values()
455-
)
456-
if progress_completed:
454+
done = self._completed_successfully()
455+
if done:
456+
self._result = True # NOTE: might have failed but it is not sure. so we set the result to True
457457
self.logger.info("✅ Service start completed successfully!! ✅")
458-
return progress_completed
458+
return done
459459

460460
time_since_last_progress = datetime.now(UTC) - self._last_progress_time
461461
if time_since_last_progress > self.max_idle_timeout:
@@ -465,19 +465,22 @@ def __call__(self, message: str) -> bool:
465465
time_since_last_progress,
466466
self.node_id,
467467
)
468+
self._result = True
468469
return True
469470

470471
return False
471472

472-
def got_expected_node_progress_types(self) -> bool:
473+
def _completed_successfully(self) -> bool:
473474
return all(
474475
progress_type in self._current_progress
475476
for progress_type in NodeProgressType.required_types_for_started_service()
477+
) and all(
478+
round(progress, 1) == 1.0 for progress in self._current_progress.values()
476479
)
477480

478481
@property
479-
def number_received_messages(self) -> int:
480-
return len(self._received_messages)
482+
def success(self) -> bool:
483+
return self._result
481484

482485

483486
def wait_for_service_endpoint_responding(
@@ -597,7 +600,9 @@ def expected_service_running(
597600
msg=f"Waiting for node to run. Timeout: {timeout}",
598601
)
599602
)
603+
600604
if is_service_legacy:
605+
waiter = None
601606
ctx.logger.info(
602607
"⚠️ Legacy service detected. We are skipping websocket messages in this case! ⚠️"
603608
)
@@ -612,7 +617,11 @@ def expected_service_running(
612617
service_running = ServiceRunning(iframe_locator=None)
613618
if press_start_button:
614619
_trigger_service_start(page, node_id)
620+
yield service_running
621+
615622
elapsed_time = arrow.utcnow() - started
623+
if waiter and not waiter.success:
624+
pytest.fail("❌ Service failed starting! ❌")
616625

617626
wait_for_service_endpoint_responding(
618627
node_id,
@@ -652,6 +661,7 @@ def wait_for_service_running(
652661
)
653662
)
654663
if is_service_legacy:
664+
waiter = None
655665
ctx.logger.info(
656666
"⚠️ Legacy service detected. We are skipping websocket messages in this case! ⚠️"
657667
)
@@ -666,6 +676,9 @@ def wait_for_service_running(
666676
if press_start_button:
667677
_trigger_service_start(page, node_id)
668678
elapsed_time = arrow.utcnow() - started
679+
680+
if waiter and not waiter.success:
681+
pytest.fail("❌ Service failed starting! ❌")
669682
wait_for_service_endpoint_responding(
670683
node_id,
671684
api_request_context=page.request,

services/api-server/requirements/_base.in

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
--requirement ../../../packages/service-library/requirements/_fastapi.in
1818

1919
aiofiles
20-
aiopg[sa]
2120
cryptography
2221
fastapi-pagination
2322
fastapi[all]

services/api-server/requirements/_base.txt

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,6 @@ aiohttp==3.11.10
5555
# -c requirements/../../../requirements/constraints.txt
5656
# -r requirements/../../../packages/simcore-sdk/requirements/_base.in
5757
# aiodocker
58-
aiopg==1.4.0
59-
# via -r requirements/_base.in
6058
aiormq==6.8.1
6159
# via aio-pika
6260
aiosignal==1.3.1
@@ -84,8 +82,6 @@ arrow==1.3.0
8482
# -r requirements/../../../packages/simcore-sdk/requirements/../../../packages/service-library/requirements/_base.in
8583
asgiref==3.8.1
8684
# via opentelemetry-instrumentation-asgi
87-
async-timeout==4.0.3
88-
# via aiopg
8985
asyncpg==0.30.0
9086
# via sqlalchemy
9187
attrs==24.2.0
@@ -513,9 +509,7 @@ psutil==6.1.0
513509
# -r requirements/../../../packages/service-library/requirements/_base.in
514510
# -r requirements/../../../packages/simcore-sdk/requirements/../../../packages/service-library/requirements/_base.in
515511
psycopg2-binary==2.9.10
516-
# via
517-
# aiopg
518-
# sqlalchemy
512+
# via sqlalchemy
519513
pycparser==2.22
520514
# via cffi
521515
pycryptodome==3.21.0
@@ -785,7 +779,6 @@ sqlalchemy==1.4.54
785779
# -r requirements/../../../packages/postgres-database/requirements/_base.in
786780
# -r requirements/../../../packages/simcore-sdk/requirements/../../../packages/postgres-database/requirements/_base.in
787781
# -r requirements/../../../packages/simcore-sdk/requirements/_base.in
788-
# aiopg
789782
# alembic
790783
starlette==0.41.3
791784
# via

services/api-server/src/simcore_service_api_server/api/dependencies/authentication.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
from models_library.products import ProductName
77
from pydantic import BaseModel, PositiveInt
88

9-
from ...db.repositories.api_keys import ApiKeysRepository, UserAndProductTuple
10-
from ...db.repositories.users import UsersRepository
9+
from ...repository.api_keys import ApiKeysRepository, UserAndProductTuple
10+
from ...repository.users import UsersRepository
1111
from .database import get_repository
1212

1313
# SEE https://swagger.io/docs/specification/authentication/basic-authentication/
@@ -22,9 +22,9 @@ class Identity(BaseModel):
2222

2323
def _create_exception() -> HTTPException:
2424
_unauthorized_headers = {
25-
"WWW-Authenticate": f'Basic realm="{basic_scheme.realm}"'
26-
if basic_scheme.realm
27-
else "Basic"
25+
"WWW-Authenticate": (
26+
f'Basic realm="{basic_scheme.realm}"' if basic_scheme.realm else "Basic"
27+
)
2828
}
2929
return HTTPException(
3030
status_code=status.HTTP_401_UNAUTHORIZED,
@@ -46,7 +46,7 @@ async def get_current_identity(
4646
if user_and_product is None:
4747
exc = _create_exception()
4848
raise exc
49-
email = await users_repo.get_active_user_email(user_and_product.user_id)
49+
email = await users_repo.get_active_user_email(user_id=user_and_product.user_id)
5050
if not email:
5151
exc = _create_exception()
5252
raise exc
Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,38 @@
11
import logging
22
from collections.abc import AsyncGenerator, Callable
3-
from typing import Annotated, cast
3+
from typing import Annotated
44

5-
from aiopg.sa import Engine
65
from fastapi import Depends
76
from fastapi.requests import Request
7+
from simcore_postgres_database.utils_aiosqlalchemy import (
8+
get_pg_engine_stateinfo,
9+
)
810
from sqlalchemy.ext.asyncio import AsyncEngine
911

10-
from ...db.events import get_asyncpg_engine
11-
from ...db.repositories import BaseRepository
12-
13-
logger = logging.getLogger(__name__)
14-
12+
from ...clients.postgres import get_engine
13+
from ...repository import BaseRepository
1514

16-
def get_db_engine(request: Request) -> Engine:
17-
return cast(Engine, request.app.state.engine)
15+
_logger = logging.getLogger(__name__)
1816

1917

2018
def get_db_asyncpg_engine(request: Request) -> AsyncEngine:
21-
return get_asyncpg_engine(request.app)
19+
return get_engine(request.app)
2220

2321

2422
def get_repository(repo_type: type[BaseRepository]) -> Callable:
2523
async def _get_repo(
26-
engine: Annotated[Engine, Depends(get_db_engine)],
24+
engine: Annotated[AsyncEngine, Depends(get_db_asyncpg_engine)],
2725
) -> AsyncGenerator[BaseRepository, None]:
2826
# NOTE: 2 different ideas were tried here with not so good
2927
# 1st one was acquiring a connection per repository which lead to the following issue https://github.com/ITISFoundation/osparc-simcore/pull/1966
3028
# 2nd one was acquiring a connection per request which works but blocks the director-v2 responsiveness once
3129
# the max amount of connections is reached
3230
# now the current solution is to acquire connection when needed.
31+
_logger.debug(
32+
"Setting up a repository. Current state of connections: %s",
33+
await get_pg_engine_stateinfo(engine),
34+
)
3335

34-
available_engines = engine.maxsize - (engine.size - engine.freesize)
35-
if available_engines <= 1:
36-
logger.warning(
37-
"Low pg connections available in pool: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]",
38-
engine.size,
39-
engine.size - engine.freesize,
40-
engine.freesize,
41-
engine.minsize,
42-
engine.maxsize,
43-
)
4436
yield repo_type(db_engine=engine)
4537

4638
return _get_repo
47-
48-
49-
__all__: tuple[str, ...] = (
50-
"Engine",
51-
"get_db_engine",
52-
"get_repository",
53-
)

0 commit comments

Comments
 (0)