Skip to content

Commit a1ab910

Browse files
AlexTatemr-c
authored andcommitted
Adding a unit test. Two outcomes are measured:
1) Once a job has been terminated, all other parallel jobs should also terminate. In this test, the runtime of the workflow indicates whether the kill switch has been handled correctly. If the kill switch is successful then the workflow's runtime should be significantly shorter than sleep_time. 2) Outputs produced by a successful step should still be collected. In this case, the completed step is make_array. To be frank, this test could be simplified by using a ToolTimeLimit requirement rather than process_roulette.cwl
1 parent b52333d commit a1ab910

File tree

3 files changed

+150
-1
lines changed

3 files changed

+150
-1
lines changed

tests/process_roulette.cwl

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#!/usr/bin/env cwl-runner
2+
3+
cwlVersion: v1.2
4+
class: CommandLineTool
5+
6+
7+
doc: |
8+
This tool selects a random process whose associated command matches
9+
search_str, terminates it, and reports the PID of the terminated process.
10+
The search_str supports regex. Example search_strs:
11+
- "sleep"
12+
- "sleep 33"
13+
- "sleep [0-9]+"
14+
15+
16+
baseCommand: [ 'bash', '-c' ]
17+
arguments:
18+
- |
19+
sleep $(inputs.delay)
20+
pid=\$(ps -ef | grep '$(inputs.search_str)' | grep -v grep | awk '{print $2}' | shuf | head -n 1)
21+
echo "$pid" | tee >(xargs kill -SIGTERM)
22+
inputs:
23+
search_str:
24+
type: string
25+
delay:
26+
type: int?
27+
default: 3
28+
stdout: "pid.txt"
29+
outputs:
30+
pid:
31+
type: string
32+
outputBinding:
33+
glob: pid.txt
34+
loadContents: true
35+
outputEval: $(self[0].contents)

tests/test_parallel.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import json
2+
import time
23
from pathlib import Path
34

45
from cwltool.context import RuntimeContext
56
from cwltool.executors import MultithreadedJobExecutor
6-
from cwltool.factory import Factory
7+
from cwltool.factory import Factory, WorkflowStatus
78

89
from .util import get_data, needs_docker
910

@@ -29,3 +30,23 @@ def test_scattered_workflow() -> None:
2930
echo = factory.make(get_data(test_file))
3031
with open(get_data(job_file)) as job:
3132
assert echo(**json.load(job)) == {"out": ["foo one three", "foo two four"]}
33+
34+
35+
def test_on_error_kill() -> None:
36+
test_file = "tests/wf/on-error_kill.cwl"
37+
runtime_context = RuntimeContext()
38+
runtime_context.on_error = "kill"
39+
factory = Factory(MultithreadedJobExecutor(), None, runtime_context)
40+
ks_test = factory.make(get_data(test_file))
41+
42+
# arbitrary test values
43+
sleep_time = 33 # a "sufficiently large" timeout
44+
n_sleepers = 5
45+
46+
try:
47+
start_time = time.time()
48+
ks_test(sleep_time=sleep_time)
49+
except WorkflowStatus as e:
50+
assert e.out == {"instructed_sleep_times": [sleep_time] * n_sleepers}
51+
assert time.time() - start_time < sleep_time
52+
print("sharty barty")

tests/wf/on-error_kill.cwl

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
#!/usr/bin/env cwl-runner
2+
3+
cwlVersion: v1.2
4+
class: Workflow
5+
requirements:
6+
ScatterFeatureRequirement: {}
7+
InlineJavascriptRequirement: {}
8+
StepInputExpressionRequirement: {}
9+
10+
11+
doc: |
12+
This workflow tests the optional argument --on-error kill.
13+
MultithreadedJobExecutor() or --parallel should be used.
14+
A successful run should:
15+
1) Finish in (much) less than sleep_time seconds.
16+
2) Return outputs produced by successful steps.
17+
18+
19+
inputs:
20+
sleep_time: { type: int, default: 33 }
21+
n_sleepers: { type: int?, default: 5 }
22+
23+
24+
steps:
25+
make_array:
26+
doc: |
27+
This step produces an array of sleep_time values to be used
28+
as inputs for the scatter_step. The array also serves as the
29+
workflow output which should be collected despite the
30+
kill switch triggered in the kill step below.
31+
in: { sleep_time: sleep_time, n_sleepers: n_sleepers }
32+
out: [ times ]
33+
run:
34+
class: ExpressionTool
35+
inputs:
36+
sleep_time: { type: int }
37+
n_sleepers: { type: int }
38+
outputs: { times: { type: "int[]" } }
39+
expression: |
40+
${ return {"times": Array(inputs.n_sleepers).fill(inputs.sleep_time)} }
41+
42+
scatter_step:
43+
doc: |
44+
This step starts several parallel jobs that each sleep for
45+
sleep_time seconds.
46+
in:
47+
time: make_array/times
48+
scatter: time
49+
out: [ ]
50+
run:
51+
class: CommandLineTool
52+
baseCommand: sleep
53+
inputs:
54+
time: { type: int, inputBinding: { position: 1 } }
55+
outputs: { }
56+
57+
kill:
58+
doc: |
59+
This step waits a few seconds and selects a random scatter_step job to kill.
60+
When `--on-error kill` is used, the runner should respond by terminating all
61+
remaining jobs and exiting. This means the workflow's overall runtime should be
62+
much less than max(sleep_time). The input force_upstream_order ensures that
63+
this step runs after make_array, and therefore roughly parallel to scatter_step.
64+
in:
65+
force_upstream_order: make_array/times
66+
sleep_time: sleep_time
67+
search_str:
68+
valueFrom: $("sleep " + inputs.sleep_time)
69+
out: [ pid ]
70+
run: ../process_roulette.cwl
71+
72+
dangling_step:
73+
doc: |
74+
This step should never run. It confirms that additional jobs aren't
75+
submitted and allowed to run to completion after the kill switch has
76+
been set. The input force_downstream_order ensures that this step runs
77+
after the kill step.
78+
in:
79+
force_downstream_order: kill/pid
80+
time: sleep_time
81+
out: [ ]
82+
run:
83+
class: CommandLineTool
84+
baseCommand: sleep
85+
inputs:
86+
time: { type: int, inputBinding: { position: 1 } }
87+
outputs: { }
88+
89+
90+
outputs:
91+
instructed_sleep_times:
92+
type: int[]
93+
outputSource: make_array/times

0 commit comments

Comments
 (0)