Skip to content

Commit 844cb17

Browse files
authored
🐛 Fixes several issues in director-v2 integration test (#2626)
1 parent 2d19a43 commit 844cb17

File tree

18 files changed

+215
-282
lines changed

18 files changed

+215
-282
lines changed

packages/pytest-simcore/src/pytest_simcore/docker_swarm.py

Lines changed: 57 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,23 @@
22
# pylint:disable=unused-argument
33
# pylint:disable=redefined-outer-name
44

5+
#
6+
# NOTE this file must be py3.6 compatible because it is used by the director
7+
#
58
import json
69
import logging
710
import subprocess
8-
import time
911
from datetime import datetime
1012
from pathlib import Path
1113
from pprint import pprint
12-
from typing import Dict, Iterator, Type
14+
from typing import Dict, Iterator
1315

1416
import docker
1517
import pytest
1618
import tenacity
1719
import yaml
1820
from docker.errors import APIError
21+
from tenacity import Retrying
1922
from tenacity.before_sleep import before_sleep_log
2023
from tenacity.stop import stop_after_attempt, stop_after_delay
2124
from tenacity.wait import wait_exponential, wait_fixed
@@ -25,6 +28,15 @@
2528

2629
log = logging.getLogger(__name__)
2730

31+
_MINUTE: int = 60 # secs
32+
HEADER: str = "{:-^50}"
33+
34+
35+
DEFAULT_RETRY_POLICY = dict(
36+
wait=wait_exponential(),
37+
stop=stop_after_delay(15),
38+
)
39+
2840

2941
class _NotInSwarmException(Exception):
3042
pass
@@ -34,6 +46,10 @@ class _StillInSwarmException(Exception):
3446
pass
3547

3648

49+
class _ResourceStillNotRemoved(Exception):
50+
pass
51+
52+
3753
def _in_docker_swarm(
3854
docker_client: docker.client.DockerClient, raise_error: bool = False
3955
) -> bool:
@@ -48,14 +64,6 @@ def _in_docker_swarm(
4864
return True
4965

5066

51-
def _attempt_for(retry_error_cls: Type[Exception]) -> tenacity.Retrying:
52-
return tenacity.Retrying(
53-
wait=wait_exponential(),
54-
stop=stop_after_delay(15),
55-
retry_error_cls=retry_error_cls,
56-
)
57-
58-
5967
@pytest.fixture(scope="session")
6068
def docker_client() -> Iterator[docker.client.DockerClient]:
6169
client = docker.from_env()
@@ -71,7 +79,9 @@ def keep_docker_up(request) -> bool:
7179
def docker_swarm(
7280
docker_client: docker.client.DockerClient, keep_docker_up: Iterator[bool]
7381
) -> Iterator[None]:
74-
for attempt in _attempt_for(retry_error_cls=_NotInSwarmException):
82+
for attempt in Retrying(
83+
retry_error_cls=_NotInSwarmException, **DEFAULT_RETRY_POLICY
84+
):
7585
with attempt:
7686
if not _in_docker_swarm(docker_client):
7787
docker_client.swarm.init(advertise_addr=get_ip())
@@ -185,6 +195,8 @@ def docker_stack(
185195
"services": [service.name for service in docker_client.services.list()],
186196
}
187197

198+
## TEAR DOWN ----------------------
199+
188200
_print_services(docker_client, "[AFTER TEST]")
189201

190202
if keep_docker_up:
@@ -205,46 +217,53 @@ def docker_stack(
205217

206218
# make down
207219
# NOTE: remove them in reverse order since stacks share common networks
208-
WAIT_BEFORE_RETRY_SECS = 1
209220

210-
HEADER = "{:-^20}"
211221
stacks.reverse()
212222
for _, stack, _ in stacks:
213223

214224
try:
215-
subprocess.run(f"docker stack remove {stack}", shell=True, check=True)
225+
subprocess.run(
226+
f"docker stack remove {stack}",
227+
shell=True,
228+
check=True,
229+
capture_output=True,
230+
)
216231
except subprocess.CalledProcessError as err:
217232
log.warning(
218233
"Ignoring failure while executing '%s' (returned code %d):\n%s\n%s\n%s\n%s\n",
219234
err.cmd,
220235
err.returncode,
221236
HEADER.format("stdout"),
222-
err.stdout,
237+
err.stdout.decode("utf8") if err.stdout else "",
223238
HEADER.format("stderr"),
224-
err.stderr,
239+
err.stderr.decode("utf8") if err.stderr else "",
225240
)
226241

227-
while docker_client.services.list(
228-
filters={"label": f"com.docker.stack.namespace={stack}"}
229-
):
230-
time.sleep(WAIT_BEFORE_RETRY_SECS)
231-
232-
while docker_client.networks.list(
233-
filters={"label": f"com.docker.stack.namespace={stack}"}
234-
):
235-
time.sleep(WAIT_BEFORE_RETRY_SECS)
236-
237-
while docker_client.containers.list(
238-
filters={"label": f"com.docker.stack.namespace={stack}"}
239-
):
240-
time.sleep(WAIT_BEFORE_RETRY_SECS)
241-
242-
for attempt in _attempt_for(retry_error_cls=APIError):
243-
with attempt:
244-
list_of_volumes = docker_client.volumes.list(
245-
filters={"label": f"com.docker.stack.namespace={stack}"}
246-
)
247-
for volume in list_of_volumes:
248-
volume.remove(force=True)
242+
# Waits that all resources get removed or force them
243+
# The check order is intentional because some resources depend on others to be removed
244+
# e.g. cannot remove networks/volumes used by running containers
245+
for resource_name in ("services", "containers", "volumes", "networks"):
246+
resource_client = getattr(docker_client, resource_name)
247+
248+
for attempt in Retrying(
249+
wait=wait_exponential(),
250+
stop=stop_after_delay(3 * _MINUTE),
251+
before_sleep=before_sleep_log(log, logging.WARNING),
252+
reraise=True,
253+
):
254+
with attempt:
255+
pending = resource_client.list(
256+
filters={"label": f"com.docker.stack.namespace={stack}"}
257+
)
258+
if pending:
259+
if resource_name in ("volumes",):
260+
# WARNING: rm volumes on this stack migh be a problem when shared between different stacks
261+
# NOTE: volumes are removed to avoid mixing configs (e.g. postgres db credentials)
262+
for resource in pending:
263+
resource.remove(force=True)
264+
265+
raise _ResourceStillNotRemoved(
266+
f"Waiting for {len(pending)} {resource_name} to shutdown: {pending}."
267+
)
249268

250269
_print_services(docker_client, "[AFTER REMOVED]")

packages/pytest-simcore/src/pytest_simcore/simcore_services.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,9 @@
1212
import tenacity
1313
from _pytest.monkeypatch import MonkeyPatch
1414
from aiohttp.client import ClientTimeout
15-
from tenacity.after import after_log
1615
from tenacity.before_sleep import before_sleep_log
1716
from tenacity.stop import stop_after_delay
18-
from tenacity.wait import wait_random
17+
from tenacity.wait import wait_exponential
1918
from yarl import URL
2019

2120
from .helpers.utils_docker import get_ip, get_service_published_port
@@ -118,17 +117,19 @@ async def simcore_services_ready(
118117

119118

120119
_MINUTE: Final[int] = 60
120+
121121
# HELPERS --
122122
@tenacity.retry(
123-
wait=wait_random(2, 15),
123+
wait=wait_exponential(),
124124
stop=stop_after_delay(5 * _MINUTE),
125125
before_sleep=before_sleep_log(log, logging.WARNING),
126-
after=after_log(log, logging.ERROR),
127126
reraise=True,
128127
)
129128
async def wait_till_service_responsive(service_name: str, endpoint: URL):
130-
print(f"trying to connect with '{service_name}' through '{endpoint}'")
131-
async with aiohttp.ClientSession(timeout=ClientTimeout(total=1)) as session:
129+
FAST = ClientTimeout(total=1) # type: ignore
130+
131+
print(f"Trying to connect with '{service_name}' through '{endpoint}'")
132+
async with aiohttp.ClientSession(timeout=FAST) as session:
132133
async with session.get(endpoint) as resp:
133134
# NOTE: Health-check endpoint require only a
134135
# status code 200 (see e.g. services/web/server/docker/healthcheck.py)

services/director-v2/src/simcore_service_director_v2/api/routes/running_interactive.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ async def start_interactive_service(
4747
"simcore/services/dynamic/3dviewer",
4848
],
4949
),
50-
service_version: str = Query(
50+
service_tag: str = Query(
5151
...,
5252
description="The tag/version of the service",
5353
regex=VERSION_RE,

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/task.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import contextlib
23
import logging
34
import traceback
45
from asyncio import Lock, Queue, Task, sleep
@@ -314,7 +315,10 @@ async def observing_single_service(service_name: str) -> None:
314315
)
315316
if resource_marked_as_locked:
316317
# fire and forget about the task
317-
asyncio.create_task(observing_single_service(service_name))
318+
asyncio.create_task(
319+
observing_single_service(service_name),
320+
name=f"observe {service_name}",
321+
)
318322

319323
logger.info("Scheduler 'trigger observation queue task' was shut down")
320324

@@ -362,9 +366,12 @@ async def start(self) -> None:
362366
# run as a background task
363367
logger.info("Starting dynamic-sidecar scheduler")
364368
self._keep_running = True
365-
self._scheduler_task = asyncio.create_task(self._run_scheduler_task())
369+
self._scheduler_task = asyncio.create_task(
370+
self._run_scheduler_task(), name="dynamic-scheduler"
371+
)
366372
self._trigger_observation_queue_task = asyncio.create_task(
367-
self._run_trigger_observation_queue_task()
373+
self._run_trigger_observation_queue_task(),
374+
name="dynamic-scheduler-trigger-obs-queue",
368375
)
369376

370377
await self._discover_running_services()
@@ -376,12 +383,16 @@ async def shutdown(self):
376383
self._to_observe = {}
377384

378385
if self._scheduler_task is not None:
379-
await self._scheduler_task
386+
self._scheduler_task.cancel()
387+
with contextlib.suppress(asyncio.CancelledError):
388+
await self._scheduler_task
380389
self._scheduler_task = None
381390

382391
if self._trigger_observation_queue_task is not None:
383392
await self._trigger_observation_queue.put(None)
384-
await self._trigger_observation_queue_task
393+
self._trigger_observation_queue_task.cancel()
394+
with contextlib.suppress(asyncio.CancelledError):
395+
await self._trigger_observation_queue_task
385396
self._trigger_observation_queue_task = None
386397
self._trigger_observation_queue = Queue()
387398

services/director-v2/tests/integration/02/test__dynamic_sidecar_nodeports_integration.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@
6666
assert_start_service,
6767
assert_stop_service,
6868
ensure_network_cleanup,
69-
get_director_v0_patched_url,
7069
is_legacy,
7170
patch_dynamic_service_url,
7271
)
@@ -118,6 +117,7 @@ def minimal_configuration( # pylint:disable=too-many-arguments
118117
ensure_swarm_and_networks: None,
119118
) -> Iterator[None]:
120119
with postgres_db.connect() as conn:
120+
# pylint: disable=no-value-for-parameter
121121
conn.execute(comp_tasks.delete())
122122
conn.execute(comp_pipeline.delete())
123123
yield
@@ -530,7 +530,6 @@ async def _wait_for_dynamic_services_to_be_running(
530530
*(
531531
assert_start_service(
532532
director_v2_client=director_v2_client,
533-
director_v0_url=director_v0_url,
534533
user_id=user_id,
535534
project_id=str(current_study.uuid),
536535
service_key=node.key,
@@ -553,12 +552,11 @@ async def _wait_for_dynamic_services_to_be_running(
553552
dynamic_services_urls[service_uuid] = dynamic_service_url
554553

555554
await assert_all_services_running(
556-
director_v2_client, director_v0_url, workbench=workbench_dynamic_services
555+
director_v2_client, workbench=workbench_dynamic_services
557556
)
558557

559558
await assert_services_reply_200(
560559
director_v2_client=director_v2_client,
561-
director_v0_url=director_v0_url,
562560
workbench=workbench_dynamic_services,
563561
)
564562

@@ -693,7 +691,6 @@ async def _assert_retrieve_completed(
693691
) -> None:
694692
await assert_retrieve_service(
695693
director_v2_client=director_v2_client,
696-
director_v0_url=director_v0_url,
697694
service_uuid=service_uuid,
698695
)
699696

@@ -794,7 +791,6 @@ async def test_nodeports_integration(
794791
`docker` for both dynamic services
795792
7. finally check that all states for both dynamic services match
796793
"""
797-
director_v0_url = get_director_v0_patched_url(services_endpoint["director"])
798794

799795
# STEP 1
800796

@@ -804,7 +800,7 @@ async def test_nodeports_integration(
804800
str, str
805801
] = await _wait_for_dynamic_services_to_be_running(
806802
director_v2_client=director_v2_client,
807-
director_v0_url=director_v0_url,
803+
director_v0_url=services_endpoint["director"],
808804
user_id=user_db["id"],
809805
workbench_dynamic_services=workbench_dynamic_services,
810806
current_study=current_study,
@@ -866,14 +862,14 @@ async def test_nodeports_integration(
866862

867863
await _assert_retrieve_completed(
868864
director_v2_client=director_v2_client,
869-
director_v0_url=director_v0_url,
865+
director_v0_url=services_endpoint["director"],
870866
service_uuid=services_node_uuids.dy,
871867
dynamic_services_urls=dynamic_services_urls,
872868
)
873869

874870
await _assert_retrieve_completed(
875871
director_v2_client=director_v2_client,
876-
director_v0_url=director_v0_url,
872+
director_v0_url=services_endpoint["director"],
877873
service_uuid=services_node_uuids.dy_compose_spec,
878874
dynamic_services_urls=dynamic_services_urls,
879875
)
@@ -911,7 +907,6 @@ async def test_nodeports_integration(
911907
*(
912908
assert_stop_service(
913909
director_v2_client=director_v2_client,
914-
director_v0_url=director_v0_url,
915910
service_uuid=service_uuid,
916911
)
917912
for service_uuid in workbench_dynamic_services
@@ -940,7 +935,7 @@ async def test_nodeports_integration(
940935

941936
await _wait_for_dynamic_services_to_be_running(
942937
director_v2_client=director_v2_client,
943-
director_v0_url=director_v0_url,
938+
director_v0_url=services_endpoint["director"],
944939
user_id=user_db["id"],
945940
workbench_dynamic_services=workbench_dynamic_services,
946941
current_study=current_study,

0 commit comments

Comments
 (0)