Skip to content

Commit d8b3828

Browse files
authored
🎨Storage with Celery: unit tests working now with storage worker implemented (#7438)
1 parent d0d22ce commit d8b3828

File tree

18 files changed

+510
-560
lines changed

18 files changed

+510
-560
lines changed

packages/pytest-simcore/src/pytest_simcore/helpers/playwright.py

Lines changed: 30 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,7 @@ def on_framereceived(payload: str | bytes) -> None:
137137
ctx.logger.debug("⬆️ Frame received: %s", payload)
138138

139139
def on_close(_: WebSocket) -> None:
140-
ctx.logger.warning(
141-
"⚠️ WebSocket closed. Attempting to reconnect..."
142-
)
140+
ctx.logger.warning("⚠️ WebSocket closed. Attempting to reconnect...")
143141
self._attempt_reconnect(ctx.logger)
144142

145143
def on_socketerror(error_msg: str) -> None:
@@ -320,9 +318,9 @@ def __call__(self, message: str) -> bool:
320318
new_progress
321319
!= self._current_progress[node_progress_event.progress_type]
322320
):
323-
self._current_progress[
324-
node_progress_event.progress_type
325-
] = new_progress
321+
self._current_progress[node_progress_event.progress_type] = (
322+
new_progress
323+
)
326324

327325
self.logger.info(
328326
"Current startup progress [expected number of node-progress-types=%d]: %s",
@@ -343,29 +341,30 @@ def __call__(self, message: str) -> bool:
343341
url = (
344342
f"https://{self.node_id}.services.{self.get_partial_product_url()}"
345343
)
346-
response = self.api_request_context.get(url, timeout=1000)
347-
level = logging.DEBUG
348-
if (response.status >= 400) and (response.status not in (502, 503)):
349-
level = logging.ERROR
350-
self.logger.log(
351-
level,
352-
"Querying service endpoint in case we missed some websocket messages. Url: %s Response: '%s' TIP: %s",
353-
url,
354-
f"{response.status}: {response.text()}",
355-
(
356-
"We are emulating the frontend; a 5XX response is acceptable if the service is not yet ready."
357-
),
358-
)
344+
with contextlib.suppress(PlaywrightTimeoutError):
345+
response = self.api_request_context.get(url, timeout=1000)
346+
level = logging.DEBUG
347+
if (response.status >= 400) and (response.status not in (502, 503)):
348+
level = logging.ERROR
349+
self.logger.log(
350+
level,
351+
"Querying service endpoint in case we missed some websocket messages. Url: %s Response: '%s' TIP: %s",
352+
url,
353+
f"{response.status}: {response.text()}",
354+
(
355+
"We are emulating the frontend; a 5XX response is acceptable if the service is not yet ready."
356+
),
357+
)
359358

360-
if response.status <= 400:
361-
# NOTE: If the response status is less than 400, it means that the backend is ready (There are some services that respond with a 3XX)
362-
if self.got_expected_node_progress_types():
363-
self.logger.warning(
364-
"⚠️ Progress bar didn't receive 100 percent but service is already running: %s. TIP: we missed some websocket messages! ⚠️", # https://github.com/ITISFoundation/osparc-simcore/issues/6449
365-
self.get_current_progress(),
366-
)
367-
return True
368-
self._last_poll_timestamp = datetime.now(UTC)
359+
if response.status <= 400:
360+
# NOTE: If the response status is less than 400, it means that the backend is ready (There are some services that respond with a 3XX)
361+
if self.got_expected_node_progress_types():
362+
self.logger.warning(
363+
"⚠️ Progress bar didn't receive 100 percent but service is already running: %s. TIP: we missed some websocket messages! ⚠️", # https://github.com/ITISFoundation/osparc-simcore/issues/6449
364+
self.get_current_progress(),
365+
)
366+
return True
367+
self._last_poll_timestamp = datetime.now(UTC)
369368

370369
return False
371370

@@ -511,19 +510,13 @@ def app_mode_trigger_next_app(page: Page) -> None:
511510

512511

513512
def wait_for_label_text(
514-
page: Page,
515-
locator: str,
516-
substring: str,
517-
timeout: int = 10000
513+
page: Page, locator: str, substring: str, timeout: int = 10000
518514
) -> Locator:
519-
page.locator(locator).wait_for(
520-
state="visible",
521-
timeout=timeout
522-
)
515+
page.locator(locator).wait_for(state="visible", timeout=timeout)
523516

524517
page.wait_for_function(
525518
f"() => document.querySelector('{locator}').innerText.includes('{substring}')",
526-
timeout=timeout
519+
timeout=timeout,
527520
)
528521

529522
return page.locator(locator)

packages/pytest-simcore/src/pytest_simcore/simcore_storage_data_models.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from sqlalchemy.dialects.postgresql import insert as pg_insert
1919
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine
2020

21-
from .helpers.faker_factories import random_project, random_user
21+
from .helpers.faker_factories import DEFAULT_FAKER, random_project, random_user
2222

2323

2424
@asynccontextmanager
@@ -62,7 +62,7 @@ async def other_user_id(sqlalchemy_async_engine: AsyncEngine) -> AsyncIterator[U
6262
@pytest.fixture
6363
async def create_project(
6464
user_id: UserID, sqlalchemy_async_engine: AsyncEngine
65-
) -> AsyncIterator[Callable[[], Awaitable[dict[str, Any]]]]:
65+
) -> AsyncIterator[Callable[..., Awaitable[dict[str, Any]]]]:
6666
created_project_uuids = []
6767

6868
async def _creator(**kwargs) -> dict[str, Any]:
@@ -71,7 +71,7 @@ async def _creator(**kwargs) -> dict[str, Any]:
7171
async with sqlalchemy_async_engine.begin() as conn:
7272
result = await conn.execute(
7373
projects.insert()
74-
.values(**random_project(**prj_config))
74+
.values(**random_project(DEFAULT_FAKER, **prj_config))
7575
.returning(sa.literal_column("*"))
7676
)
7777
row = result.one()

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py

Lines changed: 56 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
AsyncJobResult,
1313
AsyncJobStatus,
1414
)
15+
from models_library.api_schemas_rpc_async_jobs.exceptions import JobMissingError
1516
from models_library.rabbitmq_basic_types import RPCMethodName, RPCNamespace
1617
from pydantic import NonNegativeInt, TypeAdapter
1718
from tenacity import (
@@ -20,6 +21,7 @@
2021
before_sleep_log,
2122
retry,
2223
retry_if_exception_type,
24+
stop_after_attempt,
2325
stop_after_delay,
2426
wait_fixed,
2527
wait_random_exponential,
@@ -124,11 +126,11 @@ async def submit(
124126

125127

126128
_DEFAULT_RPC_RETRY_POLICY: dict[str, Any] = {
127-
"retry": retry_if_exception_type(RemoteMethodNotRegisteredError),
129+
"retry": retry_if_exception_type((RemoteMethodNotRegisteredError,)),
128130
"wait": wait_random_exponential(max=20),
129-
"stop": stop_after_delay(60),
131+
"stop": stop_after_attempt(30),
130132
"reraise": True,
131-
"before_sleep": before_sleep_log(_logger, logging.INFO),
133+
"before_sleep": before_sleep_log(_logger, logging.WARNING),
132134
}
133135

134136

@@ -146,7 +148,7 @@ async def _wait_for_completion(
146148
async for attempt in AsyncRetrying(
147149
stop=stop_after_delay(client_timeout.total_seconds()),
148150
reraise=True,
149-
retry=retry_if_exception_type(TryAgain),
151+
retry=retry_if_exception_type((TryAgain, JobMissingError)),
150152
before_sleep=before_sleep_log(_logger, logging.DEBUG),
151153
wait=wait_fixed(_DEFAULT_POLL_INTERVAL_S),
152154
):
@@ -184,45 +186,72 @@ async def result(self) -> Any:
184186
return await self._result
185187

186188

187-
async def submit_and_wait(
189+
async def wait_and_get_result(
188190
rabbitmq_rpc_client: RabbitMQRPCClient,
189191
*,
190192
rpc_namespace: RPCNamespace,
191193
method_name: str,
194+
job_id: AsyncJobId,
192195
job_id_data: AsyncJobNameData,
193196
client_timeout: datetime.timedelta,
194-
**kwargs,
195197
) -> AsyncGenerator[AsyncJobComposedResult, None]:
196-
async_job_rpc_get = None
198+
"""when a job is already submitted this will wait for its completion
199+
and return the composed result"""
197200
try:
198-
async_job_rpc_get = await submit(
199-
rabbitmq_rpc_client,
200-
rpc_namespace=rpc_namespace,
201-
method_name=method_name,
202-
job_id_data=job_id_data,
203-
**kwargs,
204-
)
205-
job_status: AsyncJobStatus | None = None
201+
job_status = None
206202
async for job_status in _wait_for_completion(
207203
rabbitmq_rpc_client,
208204
rpc_namespace=rpc_namespace,
209205
method_name=method_name,
210-
job_id=async_job_rpc_get.job_id,
206+
job_id=job_id,
211207
job_id_data=job_id_data,
212208
client_timeout=client_timeout,
213209
):
214210
assert job_status is not None # nosec
215211
yield AsyncJobComposedResult(job_status)
212+
213+
# return the result
216214
if job_status:
217215
yield AsyncJobComposedResult(
218216
job_status,
219217
result(
220218
rabbitmq_rpc_client,
221219
rpc_namespace=rpc_namespace,
222-
job_id=async_job_rpc_get.job_id,
220+
job_id=job_id,
223221
job_id_data=job_id_data,
224222
),
225223
)
224+
except (TimeoutError, CancelledError) as error:
225+
try:
226+
await cancel(
227+
rabbitmq_rpc_client,
228+
rpc_namespace=rpc_namespace,
229+
job_id=job_id,
230+
job_id_data=job_id_data,
231+
)
232+
except Exception as exc:
233+
raise exc from error # NOSONAR
234+
raise
235+
236+
237+
async def submit_and_wait(
238+
rabbitmq_rpc_client: RabbitMQRPCClient,
239+
*,
240+
rpc_namespace: RPCNamespace,
241+
method_name: str,
242+
job_id_data: AsyncJobNameData,
243+
client_timeout: datetime.timedelta,
244+
**kwargs,
245+
) -> AsyncGenerator[AsyncJobComposedResult, None]:
246+
async_job_rpc_get = None
247+
try:
248+
async_job_rpc_get = await submit(
249+
rabbitmq_rpc_client,
250+
rpc_namespace=rpc_namespace,
251+
method_name=method_name,
252+
job_id_data=job_id_data,
253+
**kwargs,
254+
)
226255
except (TimeoutError, CancelledError) as error:
227256
if async_job_rpc_get is not None:
228257
try:
@@ -235,3 +264,13 @@ async def submit_and_wait(
235264
except Exception as exc:
236265
raise exc from error
237266
raise
267+
268+
async for wait_and_ in wait_and_get_result(
269+
rabbitmq_rpc_client,
270+
rpc_namespace=rpc_namespace,
271+
method_name=method_name,
272+
job_id=async_job_rpc_get.job_id,
273+
job_id_data=job_id_data,
274+
client_timeout=client_timeout,
275+
):
276+
yield wait_and_
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from typing import Annotated
2+
3+
from fastapi import Depends, FastAPI
4+
from servicelib.fastapi.dependencies import get_app
5+
6+
from ....modules.celery import get_celery_client as _get_celery_client_from_app
7+
from ....modules.celery.client import CeleryTaskQueueClient
8+
9+
10+
def get_celery_client(
11+
app: Annotated[FastAPI, Depends(get_app)],
12+
) -> CeleryTaskQueueClient:
13+
return _get_celery_client_from_app(app)

services/storage/src/simcore_service_storage/api/rpc/_paths.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ async def compute_path_size(
2424
location_id: LocationID,
2525
path: Path,
2626
) -> AsyncJobGet:
27-
assert app # nosec
2827
task_uuid = await get_celery_client(app).send_task(
2928
remote_compute_path_size.__name__,
3029
task_context=job_id_data.model_dump(),

services/storage/src/simcore_service_storage/dsm.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
def setup_dsm(app: FastAPI) -> None:
1515
async def _on_startup() -> None:
16-
dsm_provider = DataManagerProvider(app)
16+
dsm_provider = DataManagerProvider(app=app)
1717
dsm_provider.register_builder(
1818
SimcoreS3DataManager.get_location_id(),
1919
create_simcore_s3_data_manager,
@@ -38,7 +38,7 @@ async def _on_shutdown() -> None:
3838

3939

4040
def get_dsm_provider(app: FastAPI) -> DataManagerProvider:
41-
if not app.state.dsm_provider:
41+
if not hasattr(app.state, "dsm_provider"):
4242
raise ConfigurationError(
4343
msg="DSM provider not available. Please check the configuration."
4444
)

services/storage/src/simcore_service_storage/main.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,4 @@
1717
tracing_settings=_settings.STORAGE_TRACING,
1818
)
1919

20-
_logger = logging.getLogger(__name__)
21-
2220
app = create_app(_settings)

services/storage/src/simcore_service_storage/modules/celery/_common.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
import ssl
3+
from typing import Any
34

45
from celery import Celery # type: ignore[import-untyped]
56
from settings_library.celery import CelerySettings
@@ -8,24 +9,28 @@
89
_logger = logging.getLogger(__name__)
910

1011

12+
def _celery_configure(celery_settings: CelerySettings) -> dict[str, Any]:
13+
base_config = {
14+
"broker_connection_retry_on_startup": True,
15+
"result_expires": celery_settings.CELERY_RESULT_EXPIRES,
16+
"result_extended": True,
17+
"result_serializer": "json",
18+
"task_send_sent_event": True,
19+
"task_track_started": True,
20+
"worker_send_task_events": True,
21+
}
22+
if celery_settings.CELERY_REDIS_RESULT_BACKEND.REDIS_SECURE:
23+
base_config["redis_backend_use_ssl"] = {"ssl_cert_reqs": ssl.CERT_NONE}
24+
return base_config
25+
26+
1127
def create_app(celery_settings: CelerySettings) -> Celery:
1228
assert celery_settings
1329

14-
app = Celery(
30+
return Celery(
1531
broker=celery_settings.CELERY_RABBIT_BROKER.dsn,
1632
backend=celery_settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn(
1733
RedisDatabase.CELERY_TASKS,
1834
),
35+
**_celery_configure(celery_settings),
1936
)
20-
app.conf.broker_connection_retry_on_startup = True
21-
# NOTE: disable SSL cert validation (https://github.com/ITISFoundation/osparc-simcore/pull/7407)
22-
if celery_settings.CELERY_REDIS_RESULT_BACKEND.REDIS_SECURE:
23-
app.conf.redis_backend_use_ssl = {"ssl_cert_reqs": ssl.CERT_NONE}
24-
app.conf.result_expires = celery_settings.CELERY_RESULT_EXPIRES
25-
app.conf.result_extended = True # original args are included in the results
26-
app.conf.result_serializer = "json"
27-
app.conf.task_send_sent_event = True
28-
app.conf.task_track_started = True
29-
app.conf.worker_send_task_events = True # enable tasks monitoring
30-
31-
return app

services/storage/src/simcore_service_storage/modules/celery/worker_main.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
tracing_settings=_settings.STORAGE_TRACING,
2424
)
2525

26-
_logger = logging.getLogger(__name__)
2726

2827
assert _settings.STORAGE_CELERY
2928
app = create_celery_app(_settings.STORAGE_CELERY)

0 commit comments

Comments
 (0)