Skip to content

Commit 207c602

Browse files
author
William Krinsman
committed
implemented coroutines with asyncio
1 parent c0da1f3 commit 207c602

File tree

1 file changed

+25
-34
lines changed

1 file changed

+25
-34
lines changed

batchspawner/batchspawner.py

Lines changed: 25 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* job names instead of PIDs
1717
"""
1818
import pwd
19-
import os
19+
import os, asyncio
2020

2121
import xml.etree.ElementTree as ET
2222

@@ -177,53 +177,46 @@ def parse_job_id(self, output):
177177
def cmd_formatted_for_batch(self):
178178
return ' '.join(self.cmd + self.get_args())
179179

180-
@gen.coroutine
181-
def run_command(self, cmd, input=None, env=None):
182-
proc = Subprocess(cmd, shell=True, env=env, stdin=Subprocess.STREAM, stdout=Subprocess.STREAM,stderr=Subprocess.STREAM)
183-
inbytes = None
180+
async def run_command(self, cmd, input=None, env=None):
181+
proc = await asyncio.create_subprocess_shell(cmd, env=env,
182+
stdin=asyncio.subprocess.PIPE,
183+
stdout=asyncio.subprocess.PIPE,
184+
stderr=asyncio.subprocess.PIPE)
185+
inbytes=None
186+
184187
if input:
185-
inbytes = input.encode()
186-
try:
187-
yield proc.stdin.write(inbytes)
188-
except StreamClosedError as exp:
189-
# Apparently harmless
190-
pass
191-
proc.stdin.close()
192-
out = yield proc.stdout.read_until_close()
193-
eout = yield proc.stderr.read_until_close()
194-
proc.stdout.close()
195-
proc.stderr.close()
188+
inbytes=input.encode()
189+
190+
out, eout = await proc.communicate(input=inbytes)
191+
196192
eout = eout.decode().strip()
197-
try:
198-
err = yield proc.wait_for_exit()
199-
except CalledProcessError:
200-
self.log.error("Subprocess returned exitcode %s" % proc.returncode)
193+
194+
err = proc.returncode
195+
196+
if err != 0:
197+
self.log.error("Subprocess returned exitcode %s" % err)
201198
self.log.error(eout)
202199
raise RuntimeError(eout)
203-
if err != 0:
204-
return err # exit error?
205200
else:
206201
out = out.decode().strip()
207202
return out
208203

209-
@gen.coroutine
210-
def _get_batch_script(self, **subvars):
204+
async def _get_batch_script(self, **subvars):
211205
"""Format batch script from vars"""
212206
# Colud be overridden by subclasses, but mainly useful for testing
213207
return format_template(self.batch_script, **subvars)
214208

215-
@gen.coroutine
216-
def submit_batch_script(self):
209+
async def submit_batch_script(self):
217210
subvars = self.get_req_subvars()
218211
cmd = self.exec_prefix + ' ' + self.batch_submit_cmd
219212
cmd = format_template(cmd, **subvars)
220213
subvars['cmd'] = self.cmd_formatted_for_batch()
221214
if hasattr(self, 'user_options'):
222215
subvars.update(self.user_options)
223-
script = yield self._get_batch_script(**subvars)
216+
script = await self._get_batch_script(**subvars)
224217
self.log.info('Spawner submitting job using ' + cmd)
225218
self.log.info('Spawner submitted script:\n' + script)
226-
out = yield self.run_command(cmd, input=script, env=self.get_env())
219+
out = await self.run_command(cmd, input=script, env=self.get_env())
227220
try:
228221
self.log.info('Job submitted. cmd: ' + cmd + ' output: ' + out)
229222
self.job_id = self.parse_job_id(out)
@@ -238,8 +231,7 @@ def submit_batch_script(self):
238231
"and self.job_id as {job_id}."
239232
).tag(config=True)
240233

241-
@gen.coroutine
242-
def read_job_state(self):
234+
async def read_job_state(self):
243235
if self.job_id is None or len(self.job_id) == 0:
244236
# job not running
245237
self.job_status = ''
@@ -250,7 +242,7 @@ def read_job_state(self):
250242
cmd = format_template(cmd, **subvars)
251243
self.log.debug('Spawner querying job: ' + cmd)
252244
try:
253-
out = yield self.run_command(cmd)
245+
out = await self.run_command(cmd)
254246
self.job_status = out
255247
except Exception as e:
256248
self.log.error('Error querying job ' + self.job_id)
@@ -262,14 +254,13 @@ def read_job_state(self):
262254
help="Command to stop/cancel a previously submitted job. Formatted like batch_query_cmd."
263255
).tag(config=True)
264256

265-
@gen.coroutine
266-
def cancel_batch_job(self):
257+
async def cancel_batch_job(self):
267258
subvars = self.get_req_subvars()
268259
subvars['job_id'] = self.job_id
269260
cmd = self.exec_prefix + ' ' + self.batch_cancel_cmd
270261
cmd = format_template(cmd, **subvars)
271262
self.log.info('Cancelling job ' + self.job_id + ': ' + cmd)
272-
yield self.run_command(cmd)
263+
await self.run_command(cmd)
273264

274265
def load_state(self, state):
275266
"""load job_id from state"""

0 commit comments

Comments
 (0)