Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 30 additions & 37 deletions packages/pytest-simcore/src/pytest_simcore/helpers/playwright.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,7 @@ def on_framereceived(payload: str | bytes) -> None:
ctx.logger.debug("⬆️ Frame received: %s", payload)

def on_close(_: WebSocket) -> None:
ctx.logger.warning(
"⚠️ WebSocket closed. Attempting to reconnect..."
)
ctx.logger.warning("⚠️ WebSocket closed. Attempting to reconnect...")
self._attempt_reconnect(ctx.logger)

def on_socketerror(error_msg: str) -> None:
Expand Down Expand Up @@ -320,9 +318,9 @@ def __call__(self, message: str) -> bool:
new_progress
!= self._current_progress[node_progress_event.progress_type]
):
self._current_progress[
node_progress_event.progress_type
] = new_progress
self._current_progress[node_progress_event.progress_type] = (
new_progress
)

self.logger.info(
"Current startup progress [expected number of node-progress-types=%d]: %s",
Expand All @@ -343,29 +341,30 @@ def __call__(self, message: str) -> bool:
url = (
f"https://{self.node_id}.services.{self.get_partial_product_url()}"
)
response = self.api_request_context.get(url, timeout=1000)
level = logging.DEBUG
if (response.status >= 400) and (response.status not in (502, 503)):
level = logging.ERROR
self.logger.log(
level,
"Querying service endpoint in case we missed some websocket messages. Url: %s Response: '%s' TIP: %s",
url,
f"{response.status}: {response.text()}",
(
"We are emulating the frontend; a 5XX response is acceptable if the service is not yet ready."
),
)
with contextlib.suppress(PlaywrightTimeoutError):
response = self.api_request_context.get(url, timeout=1000)
level = logging.DEBUG
if (response.status >= 400) and (response.status not in (502, 503)):
level = logging.ERROR
self.logger.log(
level,
"Querying service endpoint in case we missed some websocket messages. Url: %s Response: '%s' TIP: %s",
url,
f"{response.status}: {response.text()}",
(
"We are emulating the frontend; a 5XX response is acceptable if the service is not yet ready."
),
)

if response.status <= 400:
# NOTE: If the response status is less than 400, it means that the backend is ready (There are some services that respond with a 3XX)
if self.got_expected_node_progress_types():
self.logger.warning(
"⚠️ Progress bar didn't receive 100 percent but service is already running: %s. TIP: we missed some websocket messages! ⚠️", # https://github.com/ITISFoundation/osparc-simcore/issues/6449
self.get_current_progress(),
)
return True
self._last_poll_timestamp = datetime.now(UTC)
if response.status <= 400:
# NOTE: If the response status is less than 400, it means that the backend is ready (There are some services that respond with a 3XX)
if self.got_expected_node_progress_types():
self.logger.warning(
"⚠️ Progress bar didn't receive 100 percent but service is already running: %s. TIP: we missed some websocket messages! ⚠️", # https://github.com/ITISFoundation/osparc-simcore/issues/6449
self.get_current_progress(),
)
return True
self._last_poll_timestamp = datetime.now(UTC)

return False

Expand Down Expand Up @@ -511,19 +510,13 @@ def app_mode_trigger_next_app(page: Page) -> None:


def wait_for_label_text(
page: Page,
locator: str,
substring: str,
timeout: int = 10000
page: Page, locator: str, substring: str, timeout: int = 10000
) -> Locator:
page.locator(locator).wait_for(
state="visible",
timeout=timeout
)
page.locator(locator).wait_for(state="visible", timeout=timeout)

page.wait_for_function(
f"() => document.querySelector('{locator}').innerText.includes('{substring}')",
timeout=timeout
timeout=timeout,
)

return page.locator(locator)
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine

from .helpers.faker_factories import random_project, random_user
from .helpers.faker_factories import DEFAULT_FAKER, random_project, random_user


@asynccontextmanager
Expand Down Expand Up @@ -62,7 +62,7 @@ async def other_user_id(sqlalchemy_async_engine: AsyncEngine) -> AsyncIterator[U
@pytest.fixture
async def create_project(
user_id: UserID, sqlalchemy_async_engine: AsyncEngine
) -> AsyncIterator[Callable[[], Awaitable[dict[str, Any]]]]:
) -> AsyncIterator[Callable[..., Awaitable[dict[str, Any]]]]:
created_project_uuids = []

async def _creator(**kwargs) -> dict[str, Any]:
Expand All @@ -71,7 +71,7 @@ async def _creator(**kwargs) -> dict[str, Any]:
async with sqlalchemy_async_engine.begin() as conn:
result = await conn.execute(
projects.insert()
.values(**random_project(**prj_config))
.values(**random_project(DEFAULT_FAKER, **prj_config))
.returning(sa.literal_column("*"))
)
row = result.one()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
AsyncJobResult,
AsyncJobStatus,
)
from models_library.api_schemas_rpc_async_jobs.exceptions import JobMissingError
from models_library.rabbitmq_basic_types import RPCMethodName, RPCNamespace
from pydantic import NonNegativeInt, TypeAdapter
from tenacity import (
Expand All @@ -20,6 +21,7 @@
before_sleep_log,
retry,
retry_if_exception_type,
stop_after_attempt,
stop_after_delay,
wait_fixed,
wait_random_exponential,
Expand Down Expand Up @@ -124,11 +126,11 @@ async def submit(


_DEFAULT_RPC_RETRY_POLICY: dict[str, Any] = {
"retry": retry_if_exception_type(RemoteMethodNotRegisteredError),
"retry": retry_if_exception_type((RemoteMethodNotRegisteredError,)),
"wait": wait_random_exponential(max=20),
"stop": stop_after_delay(60),
"stop": stop_after_attempt(30),
"reraise": True,
"before_sleep": before_sleep_log(_logger, logging.INFO),
"before_sleep": before_sleep_log(_logger, logging.WARNING),
}


Expand All @@ -146,7 +148,7 @@ async def _wait_for_completion(
async for attempt in AsyncRetrying(
stop=stop_after_delay(client_timeout.total_seconds()),
reraise=True,
retry=retry_if_exception_type(TryAgain),
retry=retry_if_exception_type((TryAgain, JobMissingError)),
before_sleep=before_sleep_log(_logger, logging.DEBUG),
wait=wait_fixed(_DEFAULT_POLL_INTERVAL_S),
):
Expand Down Expand Up @@ -184,45 +186,72 @@ async def result(self) -> Any:
return await self._result


async def submit_and_wait(
async def wait_and_get_result(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
rpc_namespace: RPCNamespace,
method_name: str,
job_id: AsyncJobId,
job_id_data: AsyncJobNameData,
client_timeout: datetime.timedelta,
**kwargs,
) -> AsyncGenerator[AsyncJobComposedResult, None]:
async_job_rpc_get = None
"""when a job is already submitted this will wait for its completion
and return the composed result"""
try:
async_job_rpc_get = await submit(
rabbitmq_rpc_client,
rpc_namespace=rpc_namespace,
method_name=method_name,
job_id_data=job_id_data,
**kwargs,
)
job_status: AsyncJobStatus | None = None
job_status = None
async for job_status in _wait_for_completion(
rabbitmq_rpc_client,
rpc_namespace=rpc_namespace,
method_name=method_name,
job_id=async_job_rpc_get.job_id,
job_id=job_id,
job_id_data=job_id_data,
client_timeout=client_timeout,
):
assert job_status is not None # nosec
yield AsyncJobComposedResult(job_status)

# return the result
if job_status:
yield AsyncJobComposedResult(
job_status,
result(
rabbitmq_rpc_client,
rpc_namespace=rpc_namespace,
job_id=async_job_rpc_get.job_id,
job_id=job_id,
job_id_data=job_id_data,
),
)
except (TimeoutError, CancelledError) as error:
try:
await cancel(
rabbitmq_rpc_client,
rpc_namespace=rpc_namespace,
job_id=job_id,
job_id_data=job_id_data,
)
except Exception as exc:
raise exc from error # NOSONAR
raise


async def submit_and_wait(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
rpc_namespace: RPCNamespace,
method_name: str,
job_id_data: AsyncJobNameData,
client_timeout: datetime.timedelta,
**kwargs,
) -> AsyncGenerator[AsyncJobComposedResult, None]:
async_job_rpc_get = None
try:
async_job_rpc_get = await submit(
rabbitmq_rpc_client,
rpc_namespace=rpc_namespace,
method_name=method_name,
job_id_data=job_id_data,
**kwargs,
)
except (TimeoutError, CancelledError) as error:
if async_job_rpc_get is not None:
try:
Expand All @@ -235,3 +264,13 @@ async def submit_and_wait(
except Exception as exc:
raise exc from error
raise

async for wait_and_ in wait_and_get_result(
rabbitmq_rpc_client,
rpc_namespace=rpc_namespace,
method_name=method_name,
job_id=async_job_rpc_get.job_id,
job_id_data=job_id_data,
client_timeout=client_timeout,
):
yield wait_and_
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from typing import Annotated

from fastapi import Depends, FastAPI
from servicelib.fastapi.dependencies import get_app

from ....modules.celery import get_celery_client as _get_celery_client_from_app
from ....modules.celery.client import CeleryTaskQueueClient


def get_celery_client(
app: Annotated[FastAPI, Depends(get_app)],
) -> CeleryTaskQueueClient:
return _get_celery_client_from_app(app)
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ async def compute_path_size(
location_id: LocationID,
path: Path,
) -> AsyncJobGet:
assert app # nosec
task_uuid = await get_celery_client(app).send_task(
remote_compute_path_size.__name__,
task_context=job_id_data.model_dump(),
Expand Down
4 changes: 2 additions & 2 deletions services/storage/src/simcore_service_storage/dsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

def setup_dsm(app: FastAPI) -> None:
async def _on_startup() -> None:
dsm_provider = DataManagerProvider(app)
dsm_provider = DataManagerProvider(app=app)
dsm_provider.register_builder(
SimcoreS3DataManager.get_location_id(),
create_simcore_s3_data_manager,
Expand All @@ -38,7 +38,7 @@ async def _on_shutdown() -> None:


def get_dsm_provider(app: FastAPI) -> DataManagerProvider:
if not app.state.dsm_provider:
if not hasattr(app.state, "dsm_provider"):
raise ConfigurationError(
msg="DSM provider not available. Please check the configuration."
)
Expand Down
2 changes: 0 additions & 2 deletions services/storage/src/simcore_service_storage/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,4 @@
tracing_settings=_settings.STORAGE_TRACING,
)

_logger = logging.getLogger(__name__)

app = create_app(_settings)
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import ssl
from typing import Any

from celery import Celery # type: ignore[import-untyped]
from settings_library.celery import CelerySettings
Expand All @@ -8,24 +9,28 @@
_logger = logging.getLogger(__name__)


def _celery_configure(celery_settings: CelerySettings) -> dict[str, Any]:
base_config = {
"broker_connection_retry_on_startup": True,
"result_expires": celery_settings.CELERY_RESULT_EXPIRES,
"result_extended": True,
"result_serializer": "json",
"task_send_sent_event": True,
"task_track_started": True,
"worker_send_task_events": True,
}
if celery_settings.CELERY_REDIS_RESULT_BACKEND.REDIS_SECURE:
base_config["redis_backend_use_ssl"] = {"ssl_cert_reqs": ssl.CERT_NONE}
return base_config


def create_app(celery_settings: CelerySettings) -> Celery:
assert celery_settings

app = Celery(
return Celery(
broker=celery_settings.CELERY_RABBIT_BROKER.dsn,
backend=celery_settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn(
RedisDatabase.CELERY_TASKS,
),
**_celery_configure(celery_settings),
)
app.conf.broker_connection_retry_on_startup = True
# NOTE: disable SSL cert validation (https://github.com/ITISFoundation/osparc-simcore/pull/7407)
if celery_settings.CELERY_REDIS_RESULT_BACKEND.REDIS_SECURE:
app.conf.redis_backend_use_ssl = {"ssl_cert_reqs": ssl.CERT_NONE}
app.conf.result_expires = celery_settings.CELERY_RESULT_EXPIRES
app.conf.result_extended = True # original args are included in the results
app.conf.result_serializer = "json"
app.conf.task_send_sent_event = True
app.conf.task_track_started = True
app.conf.worker_send_task_events = True # enable tasks monitoring

return app
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
tracing_settings=_settings.STORAGE_TRACING,
)

_logger = logging.getLogger(__name__)

assert _settings.STORAGE_CELERY
app = create_celery_app(_settings.STORAGE_CELERY)
Expand Down
Loading
Loading