Skip to content

Commit aab35b6

Browse files
authored
add async pull_image (#27)
1 parent 10b9612 commit aab35b6

File tree

2 files changed

+28
-36
lines changed

2 files changed

+28
-36
lines changed

chainlink/__init__.py

Lines changed: 27 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import concurrent.futures
44
import logging
55
import tempfile
6-
import threading
76

87
import docker
98
import docker.errors
@@ -18,20 +17,22 @@ class Chainlink:
1817
A utility for running docker containers sequentially
1918
"""
2019

21-
def __init__(self, stages, workdir="/tmp"):
20+
def __init__(self, stages, workdir="/tmp", max_workers=4):
2221
self.client = docker.from_env()
2322
self.stages = stages
2423
self.workdir = workdir
25-
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
26-
self._pull_status = {}
27-
self._pull_images()
24+
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers)
2825

29-
# sync version
30-
def run(self, environ={}):
31-
return asyncio.get_event_loop().run_until_complete(self.run_async(environ))
26+
def run(self, *args, **kwargs):
27+
return asyncio.get_event_loop().run_until_complete(
28+
self.run_async(*args, **kwargs)
29+
)
3230

3331
async def run_async(self, environ={}):
3432
results = []
33+
34+
await self._pull_images()
35+
3536
with tempfile.TemporaryDirectory(dir=self.workdir) as mount:
3637
logger.info("using {} for temporary job directory".format(mount))
3738

@@ -46,43 +47,34 @@ async def run_async(self, environ={}):
4647

4748
return results
4849

49-
def _pull_images(self):
50+
async def _pull_images(self):
5051
images = set([stage["image"] for stage in self.stages])
51-
threads = []
52+
tasks = []
5253

5354
for image in images:
5455
logger.debug("pulling image '{}'".format(image))
55-
t = threading.Thread(
56-
target=self._pull_image, args=(self.client, image, self._pull_status)
57-
)
58-
t.start()
59-
threads.append(t)
60-
for t in threads:
61-
t.join()
62-
for image in images:
63-
if not self._pull_status.get(image):
64-
raise ValueError("Failed to pull all images")
6556

66-
@staticmethod
67-
def _pull_image(client, image, status):
68-
try:
69-
client.images.pull(image)
70-
status[image] = True
71-
return
72-
except docker.errors.NotFound:
73-
logger.debug("image '{}' not found on Docker Hub".format(image))
74-
except docker.errors.APIError as err:
75-
logger.debug("Docker API Error: {}".format(err))
76-
return
57+
tasks.append(self._pull_image(image))
7758

7859
try:
79-
client.images.get(image)
80-
status[image] = True
60+
await asyncio.gather(*tasks)
8161
except docker.errors.ImageNotFound:
8262
logger.error("image '{}' not found remotely or locally".format(image))
8363
except docker.errors.APIError as err:
84-
logger.debug("Docker API Error: {}".format(err))
85-
return
64+
logger.error("Docker API Error: {}".format(err))
65+
66+
async def _pull_image(self, image):
67+
def wait():
68+
try:
69+
return self.client.images.pull(image)
70+
except docker.errors.NotFound:
71+
# if not found on docker hub, try locally
72+
logger.info(
73+
"image '{}' not found on Docker Hub, fetching locally".format(image)
74+
)
75+
return self.client.images.get(image)
76+
77+
return await asyncio.get_event_loop().run_in_executor(self._executor, wait)
8678

8779
async def _run_stage(self, stage, mount, environ):
8880
environ = {**environ, **stage.get("env", {})}

tests/integration/basic.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,4 @@ def test_basic_chain(self):
2828

2929
def test_no_such_image(self):
3030
with self.assertRaises(Exception):
31-
Chainlink(stages_2)
31+
Chainlink(stages_2).run({})

0 commit comments

Comments
 (0)