@@ -42,8 +42,10 @@ def ListWorkflows(body=None):
42
42
43
43
api = get_api ()
44
44
45
- requests = api .container_requests ().list (filters = [["requesting_container_uuid" , "=" , None ]]).execute ()
46
- containers = api .containers ().list (filters = [["uuid" , "in" , [w ["container_uuid" ] for w in requests ["items" ]]]]).execute ()
45
+ requests = api .container_requests ().list (filters = [["requesting_container_uuid" , "=" , None ]],
46
+ select = ["uuid" , "command" , "container_uuid" ]).execute ()
47
+ containers = api .containers ().list (filters = [["uuid" , "in" , [w ["container_uuid" ] for w in requests ["items" ]]]],
48
+ select = ["uuid" , "state" ]).execute ()
47
49
48
50
uuidmap = {c ["uuid" ]: statemap [c ["state" ]] for c in containers ["items" ]}
49
51
@@ -60,14 +62,16 @@ def RunWorkflow(body):
60
62
return
61
63
62
64
env = {
65
+ "PATH" : os .environ ["PATH" ],
63
66
"ARVADOS_API_HOST" : os .environ ["ARVADOS_API_HOST" ],
64
67
"ARVADOS_API_TOKEN" : connexion .request .headers ['Authorization' ],
65
68
"ARVADOS_API_HOST_INSECURE" : os .environ .get ("ARVADOS_API_HOST_INSECURE" , "false" )
66
69
}
67
70
with tempfile .NamedTemporaryFile () as inputtemp :
68
- json .dump (request ["workflow_params" ], inputtemp )
69
- workflow_id = subprocess .check_output (["arvados-cwl-runner" , "--submit" , "--no-wait" ,
70
- request .get ("workflow_url" ), inputtemp .name ], env = env )
71
+ json .dump (body ["workflow_params" ], inputtemp )
72
+ inputtemp .flush ()
73
+ workflow_id = subprocess .check_output (["arvados-cwl-runner" , "--submit" , "--no-wait" , "--api=containers" ,
74
+ body .get ("workflow_url" ), inputtemp .name ], env = env ).strip ()
71
75
return {"workflow_id" : workflow_id }
72
76
73
77
def visit (d , op ):
@@ -102,7 +106,7 @@ def keepref(d):
102
106
with c .open ("stderr.txt" ) as f :
103
107
stderr = f .read ()
104
108
105
- return {
109
+ r = {
106
110
"workflow_id" : request ["uuid" ],
107
111
"request" : {},
108
112
"state" : statemap [container ["state" ]],
@@ -111,19 +115,24 @@ def keepref(d):
111
115
"startTime" : "" ,
112
116
"endTime" : "" ,
113
117
"stdout" : "" ,
114
- "stderr" : stderr ,
115
- "exitCode" : container ["exit_code" ]
118
+ "stderr" : stderr
116
119
},
117
120
"task_logs" : [],
118
121
"outputs" : outputobj
119
122
}
123
+ if container ["exit_code" ] is not None :
124
+ r ["workflow_log" ]["exitCode" ] = container ["exit_code" ]
125
+ return r
120
126
121
127
122
128
def CancelJob (workflow_id ):
123
- job = Workflow ( workflow_id )
124
- job . cancel ()
125
- return {"workflow_id" : workflow_id }
129
+ api = get_api ( )
130
+ request = api . container_requests (). update ( body = { "priority" : 0 }). execute ()
131
+ return {"workflow_id" : request [ "uuid" ] }
126
132
127
133
def GetWorkflowStatus (workflow_id ):
128
- job = Workflow (workflow_id )
129
- return job .getstatus ()
134
+ api = get_api ()
135
+ request = api .container_requests ().get (uuid = workflow_id ).execute ()
136
+ container = api .containers ().get (uuid = request ["container_uuid" ]).execute ()
137
+ return {"workflow_id" : request ["uuid" ],
138
+ "state" : statemap [container ["state" ]]}
0 commit comments