Skip to content

Commit 7c9c19b

Browse files
authored
Expose async interface for chainlink.run (#19)
* expose async interface for chainlink.run * fix formatting issue * update README.md * fix coverage * fix test * fix formatting
1 parent a9545b2 commit 7c9c19b

File tree

3 files changed

+33
-23
lines changed

3 files changed

+33
-23
lines changed

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ Note that all images needed to run the specified stages are pulled in parallel d
6060
#### Run
6161

6262
```
63-
run(self, environ={}):
63+
def run(self, environ={})
64+
async def run_async(self, environ={})
6465
```
6566

6667
The `Chainlink` run function takes a base environment (`environ`) and executes each container specified by `stages` during construction in sequence. If a stage fails, then no subsequent stages will be run.
@@ -86,6 +87,8 @@ The run function returns a list of object, an example of which is annotated belo
8687

8788
Note that the returned list will have the same number of elements as there are stages, with element corresponding to the stage with the same index.
8889

90+
`run_async` is an async version of `run`.
91+
8992
### Cross-Stage Communication
9093

9194
A single directory is mounted at `/job` in each container before it is run, and contents in this `/job` directory are persisted across stages.

chainlink/__init__.py

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,18 @@ def __init__(self, stages, workdir="/tmp"):
2626
self._pull_status = {}
2727
self._pull_images()
2828

29+
# sync version
2930
def run(self, environ={}):
31+
return asyncio.get_event_loop().run_until_complete(self.run_async(environ))
32+
33+
async def run_async(self, environ={}):
3034
results = []
3135
with tempfile.TemporaryDirectory(dir=self.workdir) as mount:
3236
logger.info("using {} for temporary job directory".format(mount))
3337

3438
for (idx, stage) in enumerate(self.stages):
3539
logger.info("running stage {}".format(idx + 1))
36-
results.append(self._run_stage(stage, mount, environ))
40+
results.append(await self._run_stage(stage, mount, environ))
3741
if not results[-1]["success"]:
3842
logger.error("stage {} was unsuccessful".format(idx + 1))
3943
break
@@ -74,7 +78,7 @@ def _pull_image(client, image, status):
7478
except docker.errors.ImageNotFound:
7579
logger.error("image '{}' not found remotely or locally".format(image))
7680

77-
def _run_stage(self, stage, mount, environ):
81+
async def _run_stage(self, stage, mount, environ):
7882
environ = {**environ, **stage.get("env", {})}
7983
volumes = {mount: {"bind": "/job", "mode": "rw"}}
8084

@@ -94,7 +98,7 @@ def _run_stage(self, stage, mount, environ):
9498
"tty": True,
9599
}
96100

97-
container, killed = self._wait_for_stage(stage, options)
101+
container, killed = await self._wait_for_stage(stage, options)
98102
result = {
99103
"data": self.client.api.inspect_container(container.id)["State"],
100104
"killed": killed,
@@ -108,27 +112,23 @@ def _run_stage(self, stage, mount, environ):
108112

109113
return result
110114

111-
def _wait_for_stage(self, stage, options):
115+
async def _wait_for_stage(self, stage, options):
112116
timeout = stage.get("timeout", 30)
113117
container = self.client.containers.run(stage["image"], **options)
118+
event_loop = asyncio.get_event_loop()
114119

115-
# anonymous async runner for executing and waiting on container
116-
async def __run(loop, executor):
117-
try:
118-
await asyncio.wait_for(
119-
loop.run_in_executor(executor, container.wait), timeout=timeout
120-
)
121-
except asyncio.TimeoutError:
122-
logger.error("killing container after {} seconds".format(timeout))
123-
container.kill()
124-
return True
125-
return False
120+
# execute and wait
121+
try:
122+
await asyncio.wait_for(
123+
event_loop.run_in_executor(self._executor, container.wait),
124+
timeout=timeout,
125+
)
126+
except asyncio.TimeoutError:
127+
logger.error("killing container after {} seconds".format(timeout))
128+
container.kill()
129+
return container, True
126130

127-
event_loop = asyncio.get_event_loop()
128-
killed = event_loop.run_until_complete(
129-
asyncio.gather(__run(event_loop, self._executor))
130-
)[0]
131-
return container, killed
131+
return container, False
132132

133133
def __del__(self):
134134
self.client.close()

tests/integration/basic.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,30 @@
22

33
from chainlink import Chainlink
44

5-
stages = [
5+
stages_1 = [
66
{
77
"image": "alpine:3.5",
88
"entrypoint": ["env"],
99
"env": {"ASSIGNMENT": "", "NETID": "$NETID"},
1010
},
1111
{"image": "alpine:3.5", "entrypoint": ["sleep", "2"]},
1212
]
13+
14+
stages_2 = [{"image": "no-such-image:3.1415926535", "entrypoint": ["env"]}]
15+
1316
env = {"TEST": "testing", "SEMESTER": "sp18", "ASSIGNMENT": "mp1"}
1417

1518

1619
class TestBasicChaining(unittest.TestCase):
1720
def test_basic_chain(self):
18-
chain = Chainlink(stages)
21+
chain = Chainlink(stages_1)
1922
results = chain.run(env)
2023

2124
self.assertFalse(results[0]["killed"])
2225
self.assertTrue("TEST=testing" in results[0]["logs"]["stdout"].decode("utf-8"))
2326
self.assertFalse(results[0]["killed"])
2427
self.assertEqual(results[1]["data"]["ExitCode"], 0)
28+
29+
def test_no_such_image(self):
30+
with self.assertRaises(Exception):
31+
Chainlink(stages_2)

0 commit comments

Comments
 (0)