24
24
25
25
import xml .etree .ElementTree as ET
26
26
27
+ from enum import Enum
28
+
27
29
from jinja2 import Template
28
30
29
31
from tornado import gen
@@ -55,6 +57,11 @@ def format_template(template, *args, **kwargs):
55
57
return Template (template ).render (* args , ** kwargs )
56
58
return template .format (* args , ** kwargs )
57
59
60
+ class JobStatus (Enum ):
61
+ NOTQUEUED = 0
62
+ RUNNING = 1
63
+ PENDING = 2
64
+ UNKNOWN = 3
58
65
59
66
class BatchSpawnerBase (Spawner ):
60
67
"""Base class for spawners using resource manager batch job submission mechanisms
@@ -264,9 +271,8 @@ async def submit_batch_script(self):
264
271
265
272
async def query_job_status (self ):
266
273
if self .job_id is None or len (self .job_id ) == 0 :
267
- # job not running
268
274
self .job_status = ''
269
- return self . job_status
275
+ return JobStatus . NOTQUEUED
270
276
subvars = self .get_req_subvars ()
271
277
subvars ['job_id' ] = self .job_id
272
278
cmd = ' ' .join ((format_template (self .exec_prefix , ** subvars ),
@@ -279,8 +285,15 @@ async def query_job_status(self):
279
285
except Exception as e :
280
286
self .log .error ('Error querying job ' + self .job_id )
281
287
self .job_status = ''
282
- finally :
283
- return self .job_status
288
+
289
+ if self .state_isrunning ():
290
+ return JobStatus .RUNNING
291
+ elif self .state_ispending ():
292
+ return JobStatus .PENDING
293
+ elif self .state_isunknown ():
294
+ return JobStatus .UNKNOWN
295
+ else :
296
+ return JobStatus .NOTQUEUED
284
297
285
298
batch_cancel_cmd = Unicode ('' ,
286
299
help = "Command to stop/cancel a previously submitted job. Formatted like batch_query_cmd."
@@ -337,16 +350,10 @@ def state_gethost(self):
337
350
338
351
async def poll (self ):
339
352
"""Poll the process"""
340
- if self .job_id is not None and len (self .job_id ) > 0 :
341
- await self .query_job_status ()
342
- if self .state_isrunning () or self .state_ispending () or self .state_isunknown ():
343
- return None
344
- else :
345
- self .clear_state ()
346
- return 1
347
-
348
- if not self .job_id :
349
- # no job id means it's not running
353
+ status = await self .query_job_status ()
354
+ if status in (JobStatus .PENDING , JobStatus .RUNNING , JobStatus .UNKNOWN ):
355
+ return None
356
+ else :
350
357
self .clear_state ()
351
358
return 1
352
359
@@ -371,18 +378,19 @@ async def start(self):
371
378
if len (self .job_id ) == 0 :
372
379
raise RuntimeError ("Jupyter batch job submission failure (no jobid in output)" )
373
380
while True :
374
- await self .query_job_status ()
375
- if self . state_isrunning () :
381
+ status = await self .query_job_status ()
382
+ if status == JobStatus . RUNNING :
376
383
break
384
+ elif status == JobStatus .PENDING :
385
+ self .log .debug ('Job ' + self .job_id + ' still pending' )
386
+ elif status == JobStatus .UNKNOWN :
387
+ self .log .debug ('Job ' + self .job_id + ' still unknown' )
377
388
else :
378
- if self .state_ispending ():
379
- self .log .debug ('Job ' + self .job_id + ' still pending' )
380
- else :
381
- self .log .warning ('Job ' + self .job_id + ' neither pending nor running.\n ' +
382
- self .job_status )
383
- raise RuntimeError ('The Jupyter batch job has disappeared'
384
- ' while pending in the queue or died immediately'
385
- ' after starting.' )
389
+ self .log .warning ('Job ' + self .job_id + ' neither pending nor running.\n ' +
390
+ self .job_status )
391
+ raise RuntimeError ('The Jupyter batch job has disappeared'
392
+ ' while pending in the queue or died immediately'
393
+ ' after starting.' )
386
394
await gen .sleep (self .startup_poll_interval )
387
395
388
396
self .ip = self .state_gethost ()
@@ -415,8 +423,8 @@ async def stop(self, now=False):
415
423
if now :
416
424
return
417
425
for i in range (10 ):
418
- await self .query_job_status ()
419
- if not self . state_isrunning () and not self . state_isunknown ( ):
426
+ status = await self .query_job_status ()
427
+ if not status in ( JobStatus . RUNNING , JobStatus . UNKNOWN ):
420
428
return
421
429
await gen .sleep (1.0 )
422
430
if self .job_id :
@@ -477,24 +485,15 @@ class BatchSpawnerRegexStates(BatchSpawnerBase):
477
485
478
486
def state_ispending (self ):
479
487
assert self .state_pending_re , "Misconfigured: define state_running_re"
480
- if self .job_status and re .search (self .state_pending_re , self .job_status ):
481
- return True
482
- else :
483
- return False
488
+ return self .job_status and re .search (self .state_pending_re , self .job_status )
484
489
485
490
def state_isrunning (self ):
486
491
assert self .state_running_re , "Misconfigured: define state_running_re"
487
- if self .job_status and re .search (self .state_running_re , self .job_status ):
488
- return True
489
- else :
490
- return False
492
+ return self .job_status and re .search (self .state_running_re , self .job_status )
491
493
492
494
def state_isunknown (self ):
493
495
assert self .state_unknown_re , "Misconfigured: define state_unknown_re"
494
- if self .job_status and re .search (self .state_unknown_re , self .job_status ):
495
- return True
496
- else :
497
- return False
496
+ return self .job_status and re .search (self .state_unknown_re , self .job_status )
498
497
499
498
def state_gethost (self ):
500
499
assert self .state_exechost_re , "Misconfigured: define state_exechost_re"
0 commit comments