@@ -16,20 +16,24 @@ def __init__(self, workflow_ID):
16
16
self .workdir = os .path .abspath (self .workflow_ID )
17
17
18
18
def run (self , path , inputobj ):
19
+ path = os .path .abspath (path )
20
+ os .mkdir (self .workdir )
19
21
outdir = os .path .join (self .workdir , "outdir" )
22
+ os .mkdir (outdir )
20
23
with open (os .path .join (self .workdir , "cwl.input.json" ), "w" ) as inputtemp :
21
- json .dump (inputtemp , inputobj )
24
+ json .dump (inputobj , inputtemp )
22
25
with open (os .path .join (self .workdir , "workflow_url" ), "w" ) as f :
23
26
f .write (path )
24
27
output = open (os .path .join (self .workdir , "cwl.output.json" ), "w" )
25
28
stderr = open (os .path .join (self .workdir , "stderr" ), "w" )
26
29
27
- proc = subprocess .Popen (["cwl-runner" , path , inputtemp .name ],
30
+ #proc = subprocess.Popen(["cwl-runner", path, inputtemp.name],
31
+ proc = subprocess .Popen (["cwltool" , path , inputtemp .name ],
28
32
stdout = output ,
29
33
stderr = stderr ,
30
34
close_fds = True ,
31
35
cwd = outdir )
32
- stdout .close ()
36
+ output .close ()
33
37
stderr .close ()
34
38
with open (os .path .join (self .workdir , "pid" ), "w" ) as pid :
35
39
pid .write (str (proc .pid ))
@@ -44,15 +48,20 @@ def getstate(self):
44
48
if os .path .exists (exc ):
45
49
with open (exc ) as f :
46
50
exit_code = int (f .read ())
47
- if exit_code == 0 :
48
- state = "Complete"
49
- else :
50
- state = "Failed"
51
51
else :
52
52
with open (os .path .join (self .workdir , "pid" ), "r" ) as pid :
53
53
pid = int (pid .read ())
54
54
(_pid , exit_status ) = os .waitpid (pid , os .WNOHANG )
55
- # record exit code
55
+ if _pid != 0 :
56
+ exit_code = exit_status >> 8
57
+ with open (exc , "w" ) as f :
58
+ f .write (str (exit_code ))
59
+ os .unlink (os .path .join (self .workdir , "pid" ))
60
+
61
+ if exit_code == 0 :
62
+ state = "Complete"
63
+ elif exit_code != - 1 :
64
+ state = "Failed"
56
65
57
66
return (state , exit_code )
58
67
@@ -67,7 +76,7 @@ def getstatus(self):
67
76
outputobj = None
68
77
if state == "Complete" :
69
78
with open (os .path .join (self .workdir , "cwl.output.json" ), "r" ) as outputtemp :
70
- outputtobj = json .load (outputtemp )
79
+ outputobj = json .load (outputtemp )
71
80
72
81
return {
73
82
"workflow_ID" : self .workflow_ID ,
@@ -81,14 +90,17 @@ def getstatus(self):
81
90
def getlog (self ):
82
91
state , exit_code = self .getstate ()
83
92
93
+ with open (os .path .join (self .workdir , "stderr" ), "r" ) as f :
94
+ stderr = f .read ()
95
+
84
96
return {
85
97
"workflow_ID" : self .workflow_ID ,
86
98
"log" : {
87
99
"cmd" : "" ,
88
100
"startTime" : "" ,
89
101
"endTime" : "" ,
90
102
"stdout" : "" ,
91
- "stderr" : "" ,
103
+ "stderr" : stderr ,
92
104
"exitCode" : exit_code
93
105
}
94
106
}
@@ -98,15 +110,16 @@ def cancel(self):
98
110
99
111
def GetWorkflowStatus (workflow_ID ):
100
112
job = Workflow (workflow_ID )
101
- job .getstatus ()
113
+ return job .getstatus ()
102
114
103
115
def GetWorkflowLog (workflow_ID ):
104
116
job = Workflow (workflow_ID )
105
- job .getlog ()
117
+ return job .getlog ()
106
118
107
119
def CancelWorkflow (workflow_ID ):
108
120
job = Workflow (workflow_ID )
109
121
job .cancel ()
122
+ return job .getstatus ()
110
123
111
124
def RunWorkflow (body ):
112
125
workflow_ID = uuid .uuid4 ().hex
0 commit comments