Skip to content

Commit dd74034

Browse files
authored
🐛 Bugfix/clean storage issue (ITISFoundation#2421)
* fix unbound variable * increase timeout for sync operation * fix issue with removed pipeline
1 parent f5be61c commit dd74034

File tree

4 files changed

+47
-29
lines changed

4 files changed

+47
-29
lines changed

packages/service-library/src/servicelib/monitoring.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
- TODO: see https://github.com/claws/aioprometheus
1010
"""
1111

12+
import asyncio
1213
import logging
1314
import time
1415

@@ -31,6 +32,7 @@ def middleware_factory(app_name):
3132
@web.middleware
3233
async def middleware_handler(request: web.Request, handler):
3334
# See https://prometheus.io/docs/concepts/metric_types
35+
resp = None
3436
try:
3537
request["start_time"] = time.time()
3638
request.app["REQUEST_IN_PROGRESS"].labels(
@@ -43,10 +45,12 @@ async def middleware_handler(request: web.Request, handler):
4345
# Captures raised reponses (success/failures accounted with resp.status)
4446
resp = exc
4547
raise
46-
except Exception as exc: # pylint: disable=broad-except
48+
except asyncio.CancelledError as exc:
49+
# python 3.8 cancellederror is a subclass of BaseException and NOT Exception
50+
resp = web.HTTPRequestTimeout(reason=str(exc))
51+
except BaseException as exc: # pylint: disable=broad-except
4752
# Prevents issue #1025.
4853
resp = web.HTTPInternalServerError(reason=str(exc))
49-
resp_time = time.time() - request["start_time"]
5054

5155
# NOTE: all access to API (i.e. and not other paths as /socket, /x, etc) shall return web.HTTPErrors since processed by error_middleware_factory
5256
log.exception(
@@ -55,9 +59,10 @@ async def middleware_handler(request: web.Request, handler):
5559
request.remote,
5660
request.method,
5761
request.path,
58-
resp_time,
62+
time.time() - request["start_time"],
5963
resp.status,
6064
)
65+
6166
finally:
6267
# metrics on the same request
6368
resp_time = time.time() - request["start_time"]
@@ -69,9 +74,10 @@ async def middleware_handler(request: web.Request, handler):
6974
app_name, request.path, request.method
7075
).dec()
7176

72-
request.app["REQUEST_COUNT"].labels(
73-
app_name, request.method, request.path, resp.status
74-
).inc()
77+
if resp:
78+
request.app["REQUEST_COUNT"].labels(
79+
app_name, request.method, request.path, resp.status
80+
).inc()
7581

7682
return resp
7783

services/director-v2/requirements/_base.in

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
# NOTE: ALL version constraints MUST be commented
55
--constraint ../../../requirements/constraints.txt
66

7-
--requirements ../../../packages/models-library/requirements/_base.in
8-
--requirements ../../../packages/settings-library/requirements/_base.in
9-
--requirements ../../../packages/postgres-database/requirements/_base.in
7+
--requirement ../../../packages/models-library/requirements/_base.in
8+
--requirement ../../../packages/settings-library/requirements/_base.in
9+
--requirement ../../../packages/postgres-database/requirements/_base.in
1010

1111
# fastapi and extensions
1212
fastapi[all]

services/director-v2/src/simcore_service_director_v2/modules/scheduler.py

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from ..models.schemas.constants import UserID
3333
from ..modules.celery import CeleryClient, CeleryTaskIn
3434
from ..utils.computations import get_pipeline_state_from_task_states
35+
from ..utils.exceptions import PipelineNotFoundError
3536
from .db.repositories import BaseRepository
3637
from .db.repositories.comp_pipelines import CompPipelinesRepository
3738
from .db.repositories.comp_runs import CompRunsRepository
@@ -204,24 +205,33 @@ async def _schedule_pipeline(
204205
user_id,
205206
)
206207

207-
pipeline_dag: nx.DiGraph = await self._get_pipeline_dag(project_id)
208-
pipeline_tasks: Dict[str, CompTaskAtDB] = await self._get_pipeline_tasks(
209-
project_id, pipeline_dag
210-
)
208+
pipeline_dag = nx.DiGraph()
209+
pipeline_tasks: Dict[str, CompTaskAtDB] = {}
210+
pipeline_result: RunningState = RunningState.UNKNOWN
211+
try:
212+
pipeline_dag = await self._get_pipeline_dag(project_id)
213+
pipeline_tasks: Dict[str, CompTaskAtDB] = await self._get_pipeline_tasks(
214+
project_id, pipeline_dag
215+
)
211216

212-
# filter out the tasks with what were already completed
213-
pipeline_dag.remove_nodes_from(
214-
{
215-
node_id
216-
for node_id, t in pipeline_tasks.items()
217-
if t.state in _COMPLETED_STATES
218-
}
219-
)
217+
# filter out the tasks with what were already completed
218+
pipeline_dag.remove_nodes_from(
219+
{
220+
node_id
221+
for node_id, t in pipeline_tasks.items()
222+
if t.state in _COMPLETED_STATES
223+
}
224+
)
220225

221-
# update the current status of the run
222-
pipeline_result: RunningState = await self._update_run_result(
223-
user_id, project_id, iteration, pipeline_tasks
224-
)
226+
# update the current status of the run
227+
pipeline_result = await self._update_run_result(
228+
user_id, project_id, iteration, pipeline_tasks
229+
)
230+
except PipelineNotFoundError:
231+
logger.warning(
232+
"pipeline %s does not exist in comp_pipeline table, it will be removed from scheduler",
233+
project_id,
234+
)
225235

226236
if not pipeline_dag.nodes():
227237
# there is nothing left, the run is completed, we're done here

services/web/server/src/simcore_service_webserver/storage_handlers.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import urllib
77
from typing import Any, Dict, List, Optional, Tuple, Union, cast
88

9-
from aiohttp import web
9+
from aiohttp import ClientTimeout, web
1010
from servicelib.request_keys import RQT_USERID_KEY
1111
from servicelib.rest_responses import unwrap_envelope
1212
from servicelib.rest_utils import extract_and_validate
@@ -46,7 +46,7 @@ def _resolve_storage_url(request: web.Request) -> URL:
4646
return url
4747

4848

49-
async def _request_storage(request: web.Request, method: str):
49+
async def _request_storage(request: web.Request, method: str, **kwargs):
5050
await extract_and_validate(request)
5151

5252
url = _resolve_storage_url(request)
@@ -57,7 +57,9 @@ async def _request_storage(request: web.Request, method: str):
5757
body = await request.json()
5858

5959
session = get_client_session(request.app)
60-
async with session.request(method.upper(), url, ssl=False, json=body) as resp:
60+
async with session.request(
61+
method.upper(), url, ssl=False, json=body, **kwargs
62+
) as resp:
6163
payload = await resp.json()
6264
return payload
6365

@@ -155,7 +157,7 @@ async def delete_file(request: web.Request):
155157
@login_required
156158
@permission_required("storage.files.sync")
157159
async def synchronise_meta_data_table(request: web.Request):
158-
payload = await _request_storage(request, "POST")
160+
payload = await _request_storage(request, "POST", timeout=ClientTimeout(total=300))
159161
return payload
160162

161163

0 commit comments

Comments
 (0)