Skip to content

Commit 4bf61c8

Browse files
bogdang989tetron
authored andcommitted
WIP InitialWorkdirRequirement array and null (#879)
* Allow staging array input in combination with other items * Allow list of files in entry field * Add InitialWorkDirRequirement conflicting names explanation * Add InitialWorkDir allowed types * Add InitialWorkDir allowed types * Reformat listing types * Add Directory[] to initwd description * Fix isinstance list -> MutableSequence * Improve resource error msg * Fix parallel executor bug, was failing conformance tests. * Add typecheck that 't' is a Mapping before checking for 'entry' * More parallel executor work, found the actual bug. Closures in Python are weird and easy to mess up. Pull out runner() so it isn't affected when the value of "job" changes in the surrounding function. * Shrink the default RAM request. * Fix type annotation
1 parent 596aab6 commit 4bf61c8

File tree

5 files changed

+101
-57
lines changed

5 files changed

+101
-57
lines changed

cwltool/command_line_tool.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -428,16 +428,25 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
428428
ls = builder.do_eval(initialWorkdir["listing"])
429429
else:
430430
for t in initialWorkdir["listing"]:
431-
if "entry" in t:
432-
et = {u"entry": builder.do_eval(t["entry"], strip_whitespace=False)}
433-
if "entryname" in t:
434-
et["entryname"] = builder.do_eval(t["entryname"])
435-
else:
436-
et["entryname"] = None
437-
et["writable"] = t.get("writable", False)
438-
ls.append(et)
431+
if isinstance(t, Mapping) and "entry" in t:
432+
entry_exp = builder.do_eval(t["entry"], strip_whitespace=False)
433+
for entry in aslist(entry_exp):
434+
et = {u"entry": entry}
435+
if "entryname" in t:
436+
et["entryname"] = builder.do_eval(t["entryname"])
437+
else:
438+
et["entryname"] = None
439+
et["writable"] = t.get("writable", False)
440+
if et[u"entry"]:
441+
ls.append(et)
439442
else:
440-
ls.append(builder.do_eval(t))
443+
initwd_item = builder.do_eval(t)
444+
if not initwd_item:
445+
continue
446+
if isinstance(initwd_item, MutableSequence):
447+
ls.extend(initwd_item)
448+
else:
449+
ls.append(initwd_item)
441450
for i, t in enumerate(ls):
442451
if "entry" in t:
443452
if isinstance(t["entry"], string_types):

cwltool/executors.py

Lines changed: 65 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,26 @@ def select_resources(self, request, runtime_context): # pylint: disable=unused-
231231

232232
return result
233233

234+
def _runner(self, job, runtime_context):
235+
""" Job running thread. """
236+
try:
237+
job.run(runtime_context)
238+
except WorkflowException as err:
239+
_logger.exception("Got workflow error")
240+
self.exceptions.append(err)
241+
except Exception as err: # pylint: disable=broad-except
242+
_logger.exception("Got workflow error")
243+
self.exceptions.append(WorkflowException(Text(err)))
244+
finally:
245+
with runtime_context.workflow_eval_lock:
246+
self.threads.remove(threading.current_thread())
247+
if isinstance(job, JobBase):
248+
self.allocated_ram -= job.builder.resources["ram"]
249+
self.allocated_cores -= job.builder.resources["cores"]
250+
runtime_context.workflow_eval_lock.notifyAll()
251+
234252
def run_job(self,
235-
job, # type: Union[JobBase, WorkflowJob]
253+
job, # type: Union[JobBase, WorkflowJob, None]
236254
runtime_context # type: RuntimeContext
237255
): # type: (...) -> None
238256
""" Execute a single Job in a seperate thread. """
@@ -241,50 +259,51 @@ def run_job(self,
241259
with self.pending_jobs_lock:
242260
self.pending_jobs.append(job)
243261

244-
while self.pending_jobs:
245-
with self.pending_jobs_lock:
246-
job = self.pending_jobs[0]
247-
if isinstance(job, JobBase) \
248-
and \
249-
((self.allocated_ram + job.builder.resources["ram"])
250-
> self.max_ram
251-
or (self.allocated_cores + job.builder.resources["cores"])
252-
> self.max_cores):
253-
_logger.warning(
254-
'Job "%s" requested more resources (%s) than are '
255-
'available (max ram is %f, max cores is %f)',
256-
job.name, job.builder.resources, self.max_ram,
257-
self.max_cores)
258-
return
262+
with self.pending_jobs_lock:
263+
n = 0
264+
while (n+1) <= len(self.pending_jobs):
265+
job = self.pending_jobs[n]
266+
if isinstance(job, JobBase):
267+
if ((job.builder.resources["ram"])
268+
> self.max_ram
269+
or (job.builder.resources["cores"])
270+
> self.max_cores):
271+
_logger.error(
272+
'Job "%s" cannot be run, requests more resources (%s) '
273+
'than available on this host (max ram %d, max cores %d',
274+
job.name, job.builder.resources,
275+
self.allocated_ram,
276+
self.allocated_cores,
277+
self.max_ram,
278+
self.max_cores)
279+
self.pending_jobs.remove(job)
280+
return
281+
282+
if ((self.allocated_ram + job.builder.resources["ram"])
283+
> self.max_ram
284+
or (self.allocated_cores + job.builder.resources["cores"])
285+
> self.max_cores):
286+
_logger.debug(
287+
'Job "%s" cannot run yet, resources (%s) are not '
288+
'available (already allocated ram is %d, allocated cores is %d, '
289+
'max ram %d, max cores %d',
290+
job.name, job.builder.resources,
291+
self.allocated_ram,
292+
self.allocated_cores,
293+
self.max_ram,
294+
self.max_cores)
295+
n += 1
296+
continue
297+
298+
thread = threading.Thread(target=self._runner, args=(job, runtime_context))
299+
thread.daemon = True
300+
self.threads.add(thread)
301+
if isinstance(job, JobBase):
302+
self.allocated_ram += job.builder.resources["ram"]
303+
self.allocated_cores += job.builder.resources["cores"]
304+
thread.start()
259305
self.pending_jobs.remove(job)
260306

261-
def runner():
262-
""" Job running thread. """
263-
try:
264-
job.run(runtime_context)
265-
except WorkflowException as err:
266-
_logger.exception("Got workflow error")
267-
self.exceptions.append(err)
268-
except Exception as err: # pylint: disable=broad-except
269-
_logger.exception("Got workflow error")
270-
self.exceptions.append(WorkflowException(Text(err)))
271-
finally:
272-
with runtime_context.workflow_eval_lock:
273-
self.threads.remove(threading.current_thread())
274-
if isinstance(job, JobBase):
275-
self.allocated_ram -= job.builder.resources["ram"]
276-
self.allocated_cores -= job.builder.resources["cores"]
277-
runtime_context.workflow_eval_lock.notifyAll()
278-
279-
thread = threading.Thread(target=runner)
280-
thread.daemon = True
281-
self.threads.add(thread)
282-
if isinstance(job, JobBase):
283-
self.allocated_ram += job.builder.resources["ram"]
284-
self.allocated_cores += job.builder.resources["cores"]
285-
thread.start()
286-
287-
288307
def wait_for_next_completion(self, runtime_context):
289308
# type: (RuntimeContext) -> None
290309
""" Wait for jobs to finish. """
@@ -324,6 +343,9 @@ def run_jobs(self,
324343
logger.error("Workflow cannot make any more progress.")
325344
break
326345

346+
self.run_job(None, runtime_context)
327347
while self.threads:
328348
self.wait_for_next_completion(runtime_context)
349+
self.run_job(None, runtime_context)
350+
329351
runtime_context.workflow_eval_lock.release()

cwltool/job.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,9 @@ def __init__(self,
202202
self.timelimit = None # type: Optional[int]
203203
self.networkaccess = False # type: bool
204204

205+
def __repr__(self):
206+
return "CommandLineJob(%s)" % self.name
207+
205208
@abstractmethod
206209
def run(self,
207210
runtimeContext # type: RuntimeContext

cwltool/process.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -763,8 +763,8 @@ def evalResources(self, builder, runtimeContext):
763763
request = {
764764
"coresMin": 1,
765765
"coresMax": 1,
766-
"ramMin": 1024,
767-
"ramMax": 1024,
766+
"ramMin": 256,
767+
"ramMax": 256,
768768
"tmpdirMin": 1024,
769769
"tmpdirMax": 1024,
770770
"outdirMin": 1024,

cwltool/schemas/v1.1.0-dev1/CommandLineTool.yml

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -866,7 +866,17 @@ $graph:
866866
- name: listing
867867
type:
868868
- type: array
869-
items: ["null", File, Directory, Dirent, string, Expression]
869+
items:
870+
- "null"
871+
- File
872+
- type: array
873+
items:
874+
- File
875+
- Directory
876+
- Directory
877+
- Dirent
878+
- string
879+
- Expression
870880
- string
871881
- Expression
872882
jsonldPredicate:
@@ -875,8 +885,8 @@ $graph:
875885
The list of files or subdirectories that must be placed in the
876886
designated output directory prior to executing the command line tool.
877887
878-
May be an expression. If so, the expression return value must validate
879-
as `{type: array, items: ["null", File, Directory, Dirent]}`.
888+
May be an expression. If so, the expression return value must validate as
889+
`{type: array, items: ["null", File, File[], Directory, Directory[], Dirent]}`.
880890
881891
Files or Directories which are listed in the input parameters and
882892
appear in the `InitialWorkDirRequirement` listing must have their

0 commit comments

Comments
 (0)