Skip to content

Commit c0b6823

Browse files
authored
Merge pull request #985 from ITISFoundation/freeze-to-stage-190723
Fix/updated pipeline (#983) Unnecessary updates of the pipeline db due to project updates would stop execution of pipeline. FIX: Updates pipeline ONLY when project's DAG topology has changed FIX: right access to maintenance service NEW for dev: adds .vscode-template/launch.json template to remote debugging of webserver and storage services
2 parents 62dc8b9 + acbad61 commit c0b6823

File tree

9 files changed

+116
-36
lines changed

9 files changed

+116
-36
lines changed

.vscode-template/launch.json

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
{
2+
// Use IntelliSense to learn about possible attributes.
3+
// Hover to view descriptions of existing attributes.
4+
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
5+
"version": "0.2.0",
6+
"configurations": [
7+
{
8+
"name": "Python: Remote Attach webserver",
9+
"type": "python",
10+
"request": "attach",
11+
"port": 3001,
12+
"host": "127.0.0.1",
13+
"pathMappings": [
14+
{
15+
"localRoot": "${workspaceFolder}/services/web/server",
16+
"remoteRoot": "/devel/services/web/server"
17+
}
18+
]
19+
},
20+
{
21+
"name": "Python: Remote Attach storage",
22+
"type": "python",
23+
"request": "attach",
24+
"port": 3003,
25+
"host": "127.0.0.1",
26+
"pathMappings": [
27+
{
28+
"localRoot": "${workspaceFolder}/services/storage",
29+
"remoteRoot": "/devel/services/storage"
30+
}
31+
]
32+
}
33+
]
34+
}

.vscode-template/pytest.ini

Lines changed: 0 additions & 2 deletions
This file was deleted.

services/maintenance/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ RUN pip --no-cache install --upgrade pip wheel setuptools && \
2525
COPY --chown=jovyan:users services/maintenance/work/ $HOME/work
2626

2727
WORKDIR $HOME/work
28-
RUN jupyter trust notebooks/*
28+
RUN jupyter trust notebooks/*; mkdir data
2929

3030

3131
USER root
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
docker
2+
aiopg[sa]

services/web/server/src/simcore_service_webserver/computation_api.py

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,21 @@
99
import sqlalchemy as sa
1010
from aiohttp import web, web_exceptions
1111
from aiopg.sa import Engine
12+
from sqlalchemy import and_
13+
1214
from servicelib.application_keys import APP_DB_ENGINE_KEY
1315
from simcore_director_sdk.rest import ApiException
1416
from simcore_postgres_database.webserver_models import (comp_pipeline,
1517
comp_tasks)
16-
from sqlalchemy import and_
1718

1819
from .director import director_sdk
1920

2021
log = logging.getLogger(__file__)
2122
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
2223

2324

24-
async def _get_node_details(node_key:str, node_version:str, app: web.Application)->dict:
25+
26+
async def _get_node_details(node_key:str, node_version:str, app: web.Application) -> Dict:
2527
if "file-picker" in node_key:
2628
# create a fake file-picker schema here!!
2729
fake_node_details = {"inputs":{},
@@ -59,7 +61,7 @@ async def _get_node_details(node_key:str, node_version:str, app: web.Application
5961
log.exception("Error could not find service %s:%s", node_key, node_version)
6062
raise web_exceptions.HTTPNotFound(reason=str(err))
6163

62-
async def _build_adjacency_list(node_uuid:str, node_schema:dict, node_inputs:dict, pipeline_data:dict, dag_adjacency_list:dict, app: web.Application)->dict: # pylint: disable=too-many-arguments
64+
async def _build_adjacency_list(node_uuid:str, node_schema:Dict, node_inputs:Dict, pipeline_data:Dict, dag_adjacency_list:Dict, app: web.Application)->Dict: # pylint: disable=too-many-arguments
6365
if node_inputs is None or node_schema is None:
6466
return dag_adjacency_list
6567

@@ -89,7 +91,7 @@ async def _build_adjacency_list(node_uuid:str, node_schema:dict, node_inputs:dic
8991
dag_adjacency_list[input_node_uuid].append(node_uuid)
9092
return dag_adjacency_list
9193

92-
async def _parse_pipeline(pipeline_data:dict, app: web.Application): # pylint: disable=R0912
94+
async def _parse_pipeline(pipeline_data:Dict, app: web.Application): # pylint: disable=R0912
9395
dag_adjacency_list = dict()
9496
tasks = dict()
9597

@@ -139,29 +141,31 @@ async def _parse_pipeline(pipeline_data:dict, app: web.Application): # pylint: d
139141
return dag_adjacency_list, tasks
140142

141143
async def _set_adjacency_in_pipeline_db(db_engine: Engine, project_id: str, dag_adjacency_list: Dict):
144+
query = sa.select([comp_pipeline]).\
145+
where(comp_pipeline.c.project_id==project_id)
146+
142147
async with db_engine.acquire() as conn:
143-
query = sa.select([comp_pipeline]).\
144-
where(comp_pipeline.c.project_id==project_id)
145148
result = await conn.execute(query)
146149
pipeline = await result.first()
147150

148-
if pipeline is None:
149-
# pylint: disable=no-value-for-parameter
150-
# let's create one then
151-
query = comp_pipeline.insert().\
152-
values(project_id=project_id,
153-
dag_adjacency_list=dag_adjacency_list,
154-
state=0)
155-
156-
log.debug("Pipeline object created")
157-
else:
158-
# let's modify it
159-
log.debug("Pipeline object found")
160-
#pylint: disable=no-value-for-parameter
161-
query = comp_pipeline.update().\
162-
where(comp_pipeline.c.project_id == project_id).\
163-
values(state=0,
164-
dag_adjacency_list=dag_adjacency_list)
151+
if pipeline is None:
152+
# pylint: disable=no-value-for-parameter
153+
# let's create one then
154+
query = comp_pipeline.insert().\
155+
values(project_id=project_id,
156+
dag_adjacency_list=dag_adjacency_list,
157+
state=0)
158+
log.debug("Pipeline object created")
159+
else:
160+
# let's modify it
161+
log.debug("Pipeline object found")
162+
#pylint: disable=no-value-for-parameter
163+
query = comp_pipeline.update().\
164+
where(comp_pipeline.c.project_id == project_id).\
165+
values(state=0,
166+
dag_adjacency_list=dag_adjacency_list)
167+
168+
async with db_engine.acquire() as conn:
165169
await conn.execute(query)
166170

167171
async def _set_tasks_in_tasks_db(db_engine: Engine, project_id: str, tasks: Dict):
@@ -219,10 +223,11 @@ async def _set_tasks_in_tasks_db(db_engine: Engine, project_id: str, tasks: Dict
219223
async def update_pipeline_db(app: web.Application, project_id, pipeline_data):
220224
db_engine = app[APP_DB_ENGINE_KEY]
221225

222-
log.debug("Client calls update_pipeline with project id: %s, pipeline data %s", project_id, pipeline_data)
226+
log.info("Pipeline has been updated for project %s", project_id)
227+
log.debug("Updating pipeline: %s", pformat(pipeline_data))
223228
dag_adjacency_list, tasks = await _parse_pipeline(pipeline_data, app)
224229

225-
log.debug("Pipeline parsed:\nlist: %s\ntasks: %s", str(dag_adjacency_list), str(tasks))
230+
log.debug("Pipeline parsed:\nlist: %s\ntasks: %s", pformat(dag_adjacency_list), pformat(tasks))
226231
await _set_adjacency_in_pipeline_db(db_engine, project_id, dag_adjacency_list)
227232
await _set_tasks_in_tasks_db(db_engine, project_id, tasks)
228233

services/web/server/src/simcore_service_webserver/projects/projects_db.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,15 @@
3333
DB_EXCLUSIVE_COLUMNS = ["type", "id", "published"]
3434

3535
# TODO: check here how schema to model db works!?
36-
def _convert_to_db_names(project_data: Dict) -> Dict:
36+
def _convert_to_db_names(project_document_data: Dict) -> Dict:
3737
converted_args = {}
38-
for key, value in project_data.items():
38+
for key, value in project_document_data.items():
3939
converted_args[ChangeCase.camel_to_snake(key)] = value
4040
return converted_args
4141

42-
def _convert_to_schema_names(project_db_data: Mapping) -> Dict:
42+
def _convert_to_schema_names(project_database_data: Mapping) -> Dict:
4343
converted_args = {}
44-
for key, value in project_db_data.items():
44+
for key, value in project_database_data.items():
4545
if key in DB_EXCLUSIVE_COLUMNS:
4646
continue
4747
converted_value = value
@@ -272,6 +272,18 @@ async def get_template_project(self, project_uuid: str, *, only_published=False)
272272

273273
return template_prj
274274

275+
async def get_project_workbench(self, project_uuid: str):
276+
async with self.engine.acquire() as conn:
277+
query = select([projects.c.workbench]).where(
278+
projects.c.uuid == project_uuid
279+
)
280+
result = await conn.execute(query)
281+
row = await result.first()
282+
if row:
283+
return row[projects.c.workbench]
284+
return {}
285+
286+
275287
async def update_user_project(self, project_data: Dict, user_id: str, project_uuid: str):
276288
""" updates a project from a user
277289

services/web/server/src/simcore_service_webserver/projects/projects_handlers.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from ..login.decorators import RQT_USERID_KEY, login_required
1313
from ..security_api import check_permission
1414
from ..storage_api import delete_data_folders_of_project
15+
from .projects_utils import is_graph_equal
1516
from .projects_api import validate_project
1617
from .projects_db import APP_PROJECT_DBAPI
1718
from .projects_exceptions import (ProjectInvalidRightsError,
@@ -79,7 +80,7 @@ async def create_projects(request: web.Request):
7980
# update metadata (uuid, timestamps, ownership) and save
8081
await db.add_project(project, user_id, force_as_template=as_template is not None)
8182

82-
# Every change in projects workbench needs to be reflected in the pipeline db
83+
# This is a new project and every new graph needs to be reflected in the pipeline db
8384
await update_pipeline_db(request.app, project["uuid"], project["workbench"])
8485

8586
except ValidationError:
@@ -186,10 +187,12 @@ async def replace_project(request: web.Request):
186187
try:
187188
validate_project(request.app, new_project)
188189

190+
previous_workbench = await db.get_project_workbench(project_uuid)
189191
await db.update_user_project(new_project, user_id, project_uuid)
190192

191-
# Every change in projects workbench needs to be reflected in the pipeline db
192-
await update_pipeline_db(request.app, project_uuid, new_project["workbench"])
193+
if not is_graph_equal(new_project["workbench"], previous_workbench):
194+
# Every change in the pipeline workflow needs to be reflected in the pipeline db
195+
await update_pipeline_db(request.app, project_uuid, new_project["workbench"])
193196

194197
except ValidationError:
195198
raise web.HTTPBadRequest

services/web/server/src/simcore_service_webserver/projects/projects_utils.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,29 @@ def _normalize_value(s):
8282
inputs.update(new_inputs)
8383

8484
return project
85+
86+
87+
def is_graph_equal(lhs_workbench: Dict, rhs_workbench: Dict) -> bool:
88+
""" Checks whether both workbench contain the same graph
89+
90+
Two graphs are the same when the same topology (i.e. nodes and edges)
91+
and the ports at each node have same values/connections
92+
"""
93+
try:
94+
assert set(rhs_workbench.keys()) == set(lhs_workbench.keys())
95+
for node_id, node in rhs_workbench.items():
96+
# same nodes
97+
assert all(node.get(k) == lhs_workbench[node_id].get(k)
98+
for k in ['key', 'version']
99+
)
100+
101+
# same connectivity (edges)
102+
assert set(node.get('inputNodes')) == set(lhs_workbench[node_id].get('inputNodes'))
103+
104+
# same input values
105+
for port_id, port in node.get("inputs", {}).items():
106+
assert port == lhs_workbench[node_id].get("inputs", {}).get(port_id)
107+
108+
except (AssertionError, TypeError, AttributeError):
109+
return False
110+
return True

services/web/server/src/simcore_service_webserver/studies_access.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ async def access_study(request: web.Request) -> web.Response:
144144

145145
template_project = await get_public_project(request.app, project_id)
146146
if not template_project:
147-
raise web.HTTPNotFound(reason=f"Invalid public study [{project_id}]")
147+
raise web.HTTPNotFound(reason=f"Requested study ({project_id}) has not been published.\
148+
Please contact the data curators for more information.")
148149

149150
user = None
150151
is_anonymous_user = await is_anonymous(request)

0 commit comments

Comments
 (0)