Skip to content

Commit 6637e08

Browse files
GitHKAndrei Neagu
andauthored
Computational services can have an interactive service as input (#1825)
* removed config.DEBUG which caused issues * Added some logging - in case the services failed to start - in debug mode * sidecar properly unzipes input if zipfile * changed the recrate into a reset status * removing input only if zip * log collection is now protected by a lock * trying to fix logs recovery * adding comments Co-authored-by: Andrei Neagu <[email protected]>
1 parent 77b93cd commit 6637e08

File tree

5 files changed

+80
-21
lines changed

5 files changed

+80
-21
lines changed

services/director/src/simcore_service_director/producer.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -264,9 +264,8 @@ async def _create_docker_service_params(
264264
docker_params["labels"]["port"] = docker_params["labels"][
265265
f"traefik.http.services.{service_name}.loadbalancer.server.port"
266266
] = str(param["value"])
267-
elif (
268-
param["type"] == "EndpointSpec"
269-
): # REST-API compatible
267+
# REST-API compatible
268+
elif param["type"] == "EndpointSpec":
270269
if "Ports" in param["value"]:
271270
if (
272271
isinstance(param["value"]["Ports"], list)
@@ -803,7 +802,11 @@ async def start_service(
803802
node_details = containers_meta_data[0]
804803
if config.MONITORING_ENABLED:
805804
service_started(
806-
app, user_id, service_key, service_tag, "DYNAMIC",
805+
app,
806+
user_id,
807+
service_key,
808+
service_tag,
809+
"DYNAMIC",
807810
)
808811
# we return only the info of the main service
809812
return node_details

services/sidecar/src/simcore_service_sidecar/executor.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import time
66
from pathlib import Path
77
from typing import Dict, Optional
8+
import zipfile
9+
import os
810

911
import aiopg
1012
import attr
@@ -164,6 +166,12 @@ async def _process_task_input(self, port: node_ports.Port, input_ports: Dict):
164166
final_path,
165167
path,
166168
)
169+
# check if the file is a zip, in that case extract all
170+
if zipfile.is_zipfile(final_path):
171+
with zipfile.ZipFile(final_path, "r") as zip_obj:
172+
zip_obj.extractall(final_path.parents[0])
173+
# finally remove the zip archive
174+
os.remove(final_path)
167175
else:
168176
input_ports[port.key] = port_value
169177
else:

services/sidecar/tests/integration/test_sidecar.py

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,17 @@
55
import asyncio
66
import inspect
77
import json
8+
from collections import deque
89
from pathlib import Path
910
from typing import Any, Dict, List, Tuple
1011
from uuid import uuid4
1112

1213
import aio_pika
1314
import pytest
1415
import sqlalchemy as sa
16+
from simcore_sdk.models.pipeline_models import ComputationalPipeline, ComputationalTask
1517
from yarl import URL
1618

17-
from simcore_sdk.models.pipeline_models import ComputationalPipeline, ComputationalTask
1819
from simcore_service_sidecar import config, utils
1920

2021
SIMCORE_S3_ID = 0
@@ -80,9 +81,29 @@ def sidecar_config(
8081
config.RABBIT_CONFIG = rabbit_config
8182

8283

83-
def _assert_incoming_data_logs(
84+
class LockedCollector:
85+
__slots__ = ("_lock", "_list")
86+
87+
def __init__(self):
88+
self._lock = asyncio.Lock()
89+
self._list = deque()
90+
91+
async def is_empty(self):
92+
async with self._lock:
93+
return len(self._list) == 0
94+
95+
async def append(self, item):
96+
async with self._lock:
97+
self._list.append(item)
98+
99+
async def as_list(self) -> List:
100+
async with self._lock:
101+
return list(self._list)
102+
103+
104+
async def _assert_incoming_data_logs(
84105
tasks: List[str],
85-
incoming_data: List[Dict[str, str]],
106+
incoming_data: LockedCollector,
86107
user_id: int,
87108
project_id: str,
88109
service_repo: str,
@@ -94,7 +115,7 @@ def _assert_incoming_data_logs(
94115
tasks_logs = {task: [] for task in tasks}
95116
progress_logs = {task: [] for task in tasks}
96117
instrumentation_messages = {task: [] for task in tasks}
97-
for message in incoming_data:
118+
for message in await incoming_data.as_list():
98119
if "metrics" in message:
99120
# instrumentation message
100121
instrumentation_messages[message["service_uuid"]].append(message)
@@ -338,13 +359,14 @@ async def test_run_services(
338359
:param osparc_service: Fixture defined in pytest-simcore.docker_registry. Uses parameters service_repo, service_tag
339360
:type osparc_service: Dict[str, str]
340361
"""
341-
incoming_data = []
362+
incoming_data = LockedCollector()
342363

343364
async def rabbit_message_handler(message: aio_pika.IncomingMessage):
344-
data = json.loads(message.body)
345-
incoming_data.append(data)
365+
async with message.process():
366+
data = json.loads(message.body)
367+
await incoming_data.append(data)
346368

347-
await rabbit_queue.consume(rabbit_message_handler, exclusive=True, no_ack=True)
369+
await rabbit_queue.consume(rabbit_message_handler, exclusive=True)
348370

349371
job_id = 1
350372

@@ -353,7 +375,7 @@ async def rabbit_message_handler(message: aio_pika.IncomingMessage):
353375
# runs None first
354376
next_task_nodes = await cli.run_sidecar(job_id, user_id, pipeline.project_id, None)
355377
await asyncio.sleep(5)
356-
assert not incoming_data
378+
assert await incoming_data.is_empty()
357379
assert next_task_nodes
358380
assert len(next_task_nodes) == 1
359381
assert next_task_nodes[0] == next(iter(pipeline_cfg))
@@ -370,7 +392,7 @@ async def rabbit_message_handler(message: aio_pika.IncomingMessage):
370392
dag.extend(pipeline_cfg[key]["next"])
371393
assert next_task_nodes == dag
372394
await asyncio.sleep(5) # wait a little bit for logs to come in
373-
_assert_incoming_data_logs(
395+
await _assert_incoming_data_logs(
374396
list(pipeline_cfg.keys()),
375397
incoming_data,
376398
user_id,

services/web/server/src/simcore_service_webserver/computation_api.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,8 @@ async def _set_adjacency_in_pipeline_db(
262262
async def _set_tasks_in_tasks_db(
263263
db_engine: Engine, project_id: str, tasks: Dict[str, Dict], replace_pipeline=True
264264
):
265+
"""The replace_pipeline is missleading, it should be interpreted
266+
as the "RUN" button was pressed on the UI."""
265267
# pylint: disable=no-value-for-parameter
266268

267269
async def _task_already_exists(
@@ -328,14 +330,24 @@ async def _update_task(
328330
result = await conn.execute(query)
329331
tasks_rows = await result.fetchall()
330332

331-
# prune database from invalid tasks
333+
# no longer prune database from invalid tasks
334+
# mark comp tasks with job_id == NULL and set status to 0
335+
# effectively marking a rest of the pipeline without loosing
336+
# inputs from comp services
332337
for task_row in tasks_rows:
338+
# for some reason the outputs are not present in the
339+
# tasks outputs. copy them over (will be used below)
340+
tasks[task_row.node_id]["outputs"] = task_row.outputs
333341
if not task_row.node_id in tasks:
334-
query = comp_tasks.delete().where(
335-
and_(
336-
comp_tasks.c.project_id == project_id,
337-
comp_tasks.c.node_id == task_row.node_id,
342+
query = (
343+
comp_tasks.update()
344+
.where(
345+
and_(
346+
comp_tasks.c.project_id == project_id,
347+
comp_tasks.c.node_id == task_row.node_id,
348+
)
338349
)
350+
.values(job_id=None, state=UNKNOWN)
339351
)
340352
await conn.execute(query)
341353

@@ -405,7 +417,7 @@ async def update_pipeline_db(
405417
project_data: Dict,
406418
replace_pipeline: bool = True,
407419
) -> None:
408-
""" Updates entries in comp_pipeline and comp_task pg tables for a given project
420+
"""Updates entries in comp_pipeline and comp_task pg tables for a given project
409421
410422
:param replace_pipeline: Fully replaces instead of partial updates of existing entries, defaults to True
411423
"""

services/web/server/src/simcore_service_webserver/projects/projects_api.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ async def start_project_interactive_services(
125125
running_services = await director_api.get_running_interactive_services(
126126
request.app, user_id, project["uuid"]
127127
)
128+
log.debug("Running services %s", running_services)
129+
128130
running_service_uuids = [x["service_uuid"] for x in running_services]
129131
# now start them if needed
130132
project_needed_services = {
@@ -133,6 +135,7 @@ async def start_project_interactive_services(
133135
if _is_node_dynamic(service["key"])
134136
and service_uuid not in running_service_uuids
135137
}
138+
log.debug("Services to start %s", project_needed_services)
136139

137140
start_service_tasks = [
138141
director_api.start_service(
@@ -145,7 +148,18 @@ async def start_project_interactive_services(
145148
)
146149
for service_uuid, service in project_needed_services.items()
147150
]
148-
await logged_gather(*start_service_tasks, reraise=True)
151+
152+
result = await logged_gather(*start_service_tasks, reraise=True)
153+
log.debug("Services start result %s", result)
154+
for entry in result:
155+
# if the status is present in the results fo the start_service
156+
# it means that the API call failed
157+
# also it is enforced that the status is different from 200 OK
158+
if "status" not in entry:
159+
continue
160+
161+
if entry["status"] != 200:
162+
log.error("Error while starting dynamic service %s", entry)
149163

150164

151165
async def delete_project(request: web.Request, project_uuid: str, user_id: int) -> None:

0 commit comments

Comments
 (0)