Skip to content

Commit 6901d09

Browse files
authored
fix failing to close aiodocker connection (#1765)
1 parent c4e821e commit 6901d09

File tree

4 files changed

+124
-113
lines changed

4 files changed

+124
-113
lines changed

services/director/tests/fixtures/fake_services.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# pylint:disable=redefined-outer-name
33

44

5+
import asyncio
56
import json
67
import logging
78
import random
@@ -10,13 +11,12 @@
1011

1112
import pytest
1213
import requests
14+
from aiodocker import utils
1315
from aiodocker.docker import Docker
1416
from aiodocker.exceptions import DockerError
1517

1618
_logger = logging.getLogger(__name__)
1719

18-
import asyncio
19-
2020

2121
@pytest.fixture(scope="function")
2222
def push_services(loop, docker_registry, tmpdir):
@@ -98,7 +98,7 @@ async def _build_push_image(
9898
*,
9999
bad_json_format=False,
100100
): # pylint: disable=R0913
101-
docker = Docker()
101+
102102
# crate image
103103
service_description = _create_service_description(service_type, name, tag)
104104
docker_labels = _create_docker_labels(service_description, bad_json_format)
@@ -157,7 +157,9 @@ async def _build_push_image(
157157
await _create_base_image(docker_labels, image_tag)
158158

159159
# push image to registry
160+
docker = Docker()
160161
await docker.images.push(image_tag)
162+
await docker.close()
161163
# remove image from host
162164
# docker.images.remove(image_tag)
163165
return {
@@ -190,9 +192,6 @@ def _clean_registry(registry_url, list_of_images):
190192
response = requests.delete(url, headers=request_headers)
191193

192194

193-
from aiodocker import utils
194-
195-
196195
async def _create_base_image(labels, tag):
197196
dockerfile = """
198197
FROM alpine
@@ -206,6 +205,7 @@ async def _create_base_image(labels, tag):
206205
base_docker_image = await docker.images.build(
207206
fileobj=tar_obj, encoding="gzip", rm=True, labels=labels, tag=tag
208207
)
208+
await docker.close()
209209
return base_docker_image[0]
210210

211211

services/sidecar/src/simcore_service_sidecar/executor.py

Lines changed: 114 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,18 @@
66
from pathlib import Path
77
from typing import Dict, Optional
88

9+
import aiopg
910
import attr
11+
from aiodocker import Docker
12+
from aiodocker.containers import DockerContainer
13+
from aiodocker.exceptions import DockerContainerError, DockerError
14+
from packaging import version
15+
from tenacity import retry, stop_after_attempt
1016

11-
import aiodocker
12-
import aiopg
1317
from celery.utils.log import get_task_logger
14-
from packaging import version
1518
from servicelib.utils import fire_and_forget_task, logged_gather
1619
from simcore_sdk import node_data, node_ports
1720
from simcore_sdk.node_ports.dbmanager import DBManager
18-
from tenacity import retry, stop_after_attempt
1921

2022
from . import config, exceptions
2123
from .boot_mode import get_boot_mode
@@ -98,7 +100,7 @@ async def run(self):
98100
except asyncio.CancelledError:
99101
await self._post_messages(LogType.LOG, "[sidecar]...task cancelled")
100102
raise
101-
except (aiodocker.exceptions.DockerError, exceptions.SidecarException) as exc:
103+
except (DockerError, exceptions.SidecarException) as exc:
102104
await self._post_messages(
103105
LogType.LOG, f"[sidecar]...task failed: {str(exc)}"
104106
)
@@ -215,40 +217,37 @@ async def _pull_image(self):
215217
config.DOCKER_PASSWORD,
216218
)
217219
try:
218-
docker_client: aiodocker.Docker = aiodocker.Docker()
219-
await self._post_messages(
220-
LogType.LOG,
221-
f"[sidecar]Pulling {self.task.image['name']}:{self.task.image['tag']}...",
222-
)
223-
await docker_client.images.pull(
224-
docker_image,
225-
auth={
226-
"username": config.DOCKER_USER,
227-
"password": config.DOCKER_PASSWORD,
228-
},
229-
)
230-
231-
# get integration version
232-
image_cfg = await docker_client.images.inspect(docker_image)
233-
# NOTE: old services did not have that label
234-
if "io.simcore.integration-version" in image_cfg["Config"]["Labels"]:
235-
self.integration_version = version.parse(
236-
json.loads(
237-
image_cfg["Config"]["Labels"]["io.simcore.integration-version"]
238-
)["integration-version"]
220+
async with Docker() as docker_client:
221+
await self._post_messages(
222+
LogType.LOG,
223+
f"[sidecar]Pulling {self.task.image['name']}:{self.task.image['tag']}...",
224+
)
225+
await docker_client.images.pull(
226+
docker_image,
227+
auth={
228+
"username": config.DOCKER_USER,
229+
"password": config.DOCKER_PASSWORD,
230+
},
239231
)
240232

241-
except aiodocker.exceptions.DockerError:
233+
# get integration version
234+
image_cfg = await docker_client.images.inspect(docker_image)
235+
# NOTE: old services did not have that label
236+
if "io.simcore.integration-version" in image_cfg["Config"]["Labels"]:
237+
self.integration_version = version.parse(
238+
json.loads(
239+
image_cfg["Config"]["Labels"][
240+
"io.simcore.integration-version"
241+
]
242+
)["integration-version"]
243+
)
244+
245+
except DockerError:
242246
msg = f"Failed to pull image '{docker_image}'"
243247
log.exception(msg)
244248
raise
245249

246-
async def _run_container(self):
247-
# pylint: disable=too-many-statements, too-many-branches
248-
start_time = time.perf_counter()
249-
container = None
250-
docker_image = f"{config.DOCKER_REGISTRY}/{self.task.image['name']}:{self.task.image['tag']}"
251-
250+
async def _create_container_config(self, docker_image: str) -> Dict:
252251
# NOTE: Env/Binds for log folder is only necessary for integraion "0"
253252
env_vars = [
254253
f"{name.upper()}_FOLDER=/{name}/{self.task.job_id}"
@@ -293,89 +292,105 @@ async def _run_container(self):
293292
],
294293
},
295294
}
296-
log.debug(
297-
"Running image %s with config %s", docker_image, docker_container_config
295+
return docker_container_config
296+
297+
async def _start_monitoring_container(
298+
self, container: DockerContainer
299+
) -> asyncio.Future:
300+
log_file = self.shared_folders.log_folder / "log.dat"
301+
if self.integration_version == version.parse("0.0.0"):
302+
# touch output file, so it's ready for the container (v0)
303+
log_file.touch()
304+
305+
log_processor_task = fire_and_forget_task(
306+
monitor_logs_task(log_file, self._post_messages)
307+
)
308+
return log_processor_task
309+
log_processor_task = fire_and_forget_task(
310+
monitor_logs_task(container, self._post_messages, log_file)
298311
)
312+
return log_processor_task
313+
314+
async def _run_container(self):
315+
start_time = time.perf_counter()
316+
docker_image = f"{config.DOCKER_REGISTRY}/{self.task.image['name']}:{self.task.image['tag']}"
317+
container_config = await self._create_container_config(docker_image)
318+
299319
# volume paths for car container (w/o prefix)
300320
result = "FAILURE"
301321
log_processor_task = None
302322
try:
303-
docker_client: aiodocker.Docker = aiodocker.Docker()
304-
await self._post_messages(
305-
LogType.LOG,
306-
f"[sidecar]Running {self.task.image['name']}:{self.task.image['tag']}...",
307-
)
308-
container = await docker_client.containers.create(
309-
config=docker_container_config
310-
)
311-
# start monitoring logs
312-
log_file = self.shared_folders.log_folder / "log.dat"
313-
if self.integration_version == version.parse("0.0.0"):
314-
# touch output file, so it's ready for the container (v0)
315-
log_file.touch()
316-
317-
log_processor_task = fire_and_forget_task(
318-
monitor_logs_task(log_file, self._post_messages)
323+
async with Docker() as docker_client:
324+
await self._post_messages(
325+
LogType.LOG,
326+
f"[sidecar]Running {self.task.image['name']}:{self.task.image['tag']}...",
319327
)
320-
else:
321-
log_processor_task = fire_and_forget_task(
322-
monitor_logs_task(container, self._post_messages, log_file)
328+
container = await docker_client.containers.create(
329+
config=container_config
330+
)
331+
log_processor_task = await self._start_monitoring_container(container)
332+
# start the container
333+
await container.start()
334+
# indicate container is started
335+
await self.rabbit_mq.post_instrumentation_message(
336+
{
337+
"metrics": "service_started",
338+
"user_id": self.user_id,
339+
"project_id": self.task.project_id,
340+
"service_uuid": self.task.node_id,
341+
"service_type": "COMPUTATIONAL",
342+
"service_key": self.task.image["name"],
343+
"service_tag": self.task.image["tag"],
344+
}
323345
)
324-
# start the container
325-
await container.start()
326-
# indicate container is started
327-
await self.rabbit_mq.post_instrumentation_message(
328-
{
329-
"metrics": "service_started",
330-
"user_id": self.user_id,
331-
"project_id": self.task.project_id,
332-
"service_uuid": self.task.node_id,
333-
"service_type": "COMPUTATIONAL",
334-
"service_key": self.task.image["name"],
335-
"service_tag": self.task.image["tag"],
336-
}
337-
)
338346

339-
# wait until the container finished, either success or fail or timeout
340-
container_data = await container.show()
341-
while container_data["State"]["Running"]:
342-
await asyncio.sleep(2)
343-
# reload container data
347+
# wait until the container finished, either success or fail or timeout
344348
container_data = await container.show()
345-
if (
346-
(time.perf_counter() - start_time) > config.SERVICES_TIMEOUT_SECONDS
347-
and config.SERVICES_TIMEOUT_SECONDS > 0
348-
):
349-
log.error(
350-
"Running container timed-out after %ss and will be stopped now\nlogs: %s",
351-
config.SERVICES_TIMEOUT_SECONDS,
352-
container.log(stdout=True, stderr=True),
349+
TIME_TO_NEXT_PERDIODIC_CHECK_SECS = 2
350+
while container_data["State"]["Running"]:
351+
await asyncio.sleep(TIME_TO_NEXT_PERDIODIC_CHECK_SECS)
352+
# reload container data
353+
container_data = await container.show()
354+
if (
355+
(time.perf_counter() - start_time)
356+
> config.SERVICES_TIMEOUT_SECONDS
357+
and config.SERVICES_TIMEOUT_SECONDS > 0
358+
):
359+
log.error(
360+
"Running container timed-out after %ss and will be stopped now\nlogs: %s",
361+
config.SERVICES_TIMEOUT_SECONDS,
362+
container.log(stdout=True, stderr=True),
363+
)
364+
await container.stop()
365+
break
366+
367+
# reload container data to check the error code with latest info
368+
container_data = await container.show()
369+
if container_data["State"]["ExitCode"] > 0:
370+
exc = exceptions.SidecarException(
371+
f"{docker_image} completed with error code {container_data['State']['ExitCode']}:\n {container_data['State']['Error']}\n:Last logs:\n{container.logs(stdout=True, stderr=True, tail=10)}"
353372
)
354-
await container.stop()
355-
break
356-
357-
# reload container data to check the error code with latest info
358-
container_data = await container.show()
359-
if container_data["State"]["ExitCode"] > 0:
360-
raise exceptions.SidecarException(
361-
f"{docker_image} completed with error code {container_data['State']['ExitCode']}:\n {container_data['State']['Error']}\n:Last logs:\n{container.logs(stdout=True, stderr=True, tail=10)}"
362-
)
363-
# ensure progress 1.0 is sent
364-
await self._post_messages(LogType.PROGRESS, "1.0")
365-
result = "SUCCESS"
366-
log.info("%s completed with successfully!", docker_image)
367-
except aiodocker.exceptions.DockerContainerError:
373+
# clean up the container
374+
await container.delete(force=True)
375+
raise exc
376+
# clean up the container
377+
await container.delete(force=True)
378+
# ensure progress 1.0 is sent
379+
await self._post_messages(LogType.PROGRESS, "1.0")
380+
result = "SUCCESS"
381+
log.info("%s completed with successfully!", docker_image)
382+
except DockerContainerError:
368383
log.exception(
369384
"Error while running %s with parameters %s",
370385
docker_image,
371-
docker_container_config,
386+
container_config,
372387
)
373388
raise
374-
except aiodocker.exceptions.DockerError:
389+
except DockerError:
375390
log.exception(
376391
"Unknown error while trying to run %s with parameters %s",
377392
docker_image,
378-
docker_container_config,
393+
container_config,
379394
)
380395
raise
381396
except asyncio.CancelledError:
@@ -385,12 +400,10 @@ async def _run_container(self):
385400
finally:
386401
stop_time = time.perf_counter()
387402
log.info("Running %s took %sseconds", docker_image, stop_time - start_time)
388-
if container:
389-
# clean up the container
390-
await container.delete(force=True)
391403
# stop monitoring logs now
392404
if log_processor_task:
393405
log_processor_task.cancel()
406+
await log_processor_task
394407
# instrumentation
395408
await self.rabbit_mq.post_instrumentation_message(
396409
{
@@ -404,8 +417,6 @@ async def _run_container(self):
404417
"result": result,
405418
}
406419
)
407-
if log_processor_task:
408-
await log_processor_task
409420

410421
async def _process_task_output(self):
411422
"""There will be some files in the /output

services/sidecar/src/simcore_service_sidecar/utils.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,9 @@ def assemble_celery_app(task_default_queue: str, rabbit_config: RabbitConfig) ->
141141

142142
async def get_volume_mount_point(volume_name: str) -> str:
143143
try:
144-
docker_client: aiodocker.Docker = aiodocker.Docker()
145-
volume_attributes = await DockerVolume(docker_client, volume_name).show()
146-
return volume_attributes["Mountpoint"]
144+
async with aiodocker.Docker() as docker_client:
145+
volume_attributes = await DockerVolume(docker_client, volume_name).show()
146+
return volume_attributes["Mountpoint"]
147147

148148
except aiodocker.exceptions.DockerError as err:
149149
raise SidecarException(

services/sidecar/tests/integration/test_sidecar.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ async def rabbit_message_handler(message: aio_pika.IncomingMessage):
371371
for key in pipeline_cfg:
372372
dag.extend(pipeline_cfg[key]["next"])
373373
assert next_task_nodes == dag
374-
374+
await asyncio.sleep(5) # wait a little bit for logs to come in
375375
_assert_incoming_data_logs(
376376
list(pipeline_cfg.keys()),
377377
incoming_data,

0 commit comments

Comments
 (0)