Skip to content

Commit dfcb2f5

Browse files
API upgrade. Tested (#49)
* Add API endpoint to create and trigger DAGs in the same call Moved logic from clean_dag_run into cwl.py so it can be reused. Add --combine flag to test new API endpoint. Possible risk - can be too slow when cleaning old dag runs, as we can't create new dag_run with the same dag_id and run_id if the previous one wasn't deleted. * Add tests to travis for --combine parameter * Add comments to functions * Add progress field in the GET /dag_runs response * Fix bug in progress calculation * Refactore get_dag_runs to work fast * Update docs * Not important changes
1 parent aac4242 commit dfcb2f5

File tree

8 files changed

+327
-183
lines changed

8 files changed

+327
-183
lines changed

.travis.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ jobs:
6666
tags: true
6767
- name: DAG with embedded workflow (just one test)
6868
script: cwl-airflow test --suite workflows/tests/conformance_tests.yaml --spin --range 1 --embed
69+
- name: DAG with attached workflow using combined API call (just one test)
70+
script: cwl-airflow test --suite workflows/tests/conformance_tests.yaml --spin --range 1 --combine
71+
- name: DAG with embedded workflow using combined API call (just one test)
72+
script: cwl-airflow test --suite workflows/tests/conformance_tests.yaml --spin --range 1 --embed --combine
6973
- name: Test of `init --upgrade`
7074
before_install:
7175
- mkdir -p ~/airflow/dags
@@ -115,7 +119,7 @@ script: cwl-airflow test --suite workflows/tests/conformance_tests.yaml --spin -
115119
branches:
116120
only:
117121
- master
118-
- /*_devel/
122+
- /^*_devel$/
119123
- /^([1-9]\d*!)?(0|[1-9]\d*)(\.(0|[1-9]\d*))*((a|b|rc)(0|[1-9]\d*))?(\.post(0|[1-9]\d*))?(\.dev(0|[1-9]\d*))?$/
120124

121125
notifications:

cwl_airflow/components/api/backend.py

Lines changed: 34 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
fast_cwl_load,
3434
slow_cwl_load,
3535
convert_to_workflow,
36+
clean_up_dag_run,
3637
DAG_TEMPLATE
3738
)
3839

@@ -75,7 +76,7 @@ def __init__(self):
7576

7677

7778
def get_dags(self, dag_ids=[]):
78-
logging.debug(f"Call get_dags with dag_ids={dag_ids}")
79+
logging.info(f"Call get_dags with dag_ids={dag_ids}")
7980
try:
8081
dag_ids = dag_ids or self.list_dags()
8182
logging.debug(f"Processing dags {dag_ids}")
@@ -85,41 +86,39 @@ def get_dags(self, dag_ids=[]):
8586
return {"dags": []}
8687

8788

88-
def post_dag(self, dag_id=None):
89-
logging.debug(f"Call post_dag with dag_id={dag_id}")
89+
def post_dags(self, dag_id=None):
90+
logging.info(f"Call post_dags with dag_id={dag_id}")
9091
try:
9192
res = self.export_dag(dag_id or ''.join(random.choice(string.ascii_lowercase) for i in range(32)))
9293
logging.debug(f"Exported DAG {res}")
9394
return res
9495
except Exception as err:
95-
logging.error(f"Failed while running post_dag {err}")
96+
logging.error(f"Failed while running post_dags {err}")
9697
return connexion.problem(500, "Failed to create dag", str(err))
9798

9899

99100
def get_dag_runs(self, dag_id=None, run_id=None, execution_date=None, state=None):
100-
logging.debug(f"Call get_dag_runs with dag_id={dag_id}, run_id={run_id}, execution_date={execution_date}, state={state}")
101+
logging.info(f"Call get_dag_runs with dag_id={dag_id}, run_id={run_id}, execution_date={execution_date}, state={state}")
101102
try:
102103
dag_runs = []
103104
dag_ids = [dag_id] if dag_id else self.list_dags()
104-
logging.debug(f"Processing dags {dag_ids}")
105+
logging.debug(f"Found dags {dag_ids}")
105106
for d_id in dag_ids:
106-
logging.debug(f"Process dag {d_id}")
107-
task_ids = self.list_tasks(d_id)
108-
logging.debug(f"Fetched tasks {task_ids}")
109-
for dag_run in self.list_dag_runs(d_id, state):
110-
logging.debug(f"Process dag run {dag_run['run_id']}, {dag_run['execution_date']}")
111-
if run_id and run_id != dag_run["run_id"] or execution_date and execution_date != dag_run["execution_date"]:
112-
logging.debug(f"Skip dag_run {dag_run['run_id']}, {dag_run['execution_date']} (run_id or execution_date doesn't match)")
107+
logging.info(f"Process dag {d_id}")
108+
for dag_run in DagRun.find(dag_id=d_id, state=state):
109+
logging.info(f"Process dag_run {dag_run.run_id}, {dag_run.execution_date.isoformat()}")
110+
if run_id and run_id != dag_run.run_id or execution_date and execution_date != dag_run.execution_date.isoformat():
111+
logging.info(f"Skip dag_run {dag_run.run_id}, {dag_run.execution_date.isoformat()} (run_id or execution_date doesn't match)")
113112
continue
114-
response_item = {"dag_id": d_id,
115-
"run_id": dag_run["run_id"],
116-
"execution_date": dag_run["execution_date"],
117-
"start_date": dag_run["start_date"],
118-
"state": dag_run["state"],
119-
"tasks": []}
120-
logging.debug(f"Get statuses for tasks {task_ids}")
121-
for t_id in task_ids:
122-
response_item["tasks"].append({"id": t_id, "state": self.task_state(d_id, t_id, dag_run["execution_date"])})
113+
response_item = {
114+
"dag_id": d_id,
115+
"run_id": dag_run.run_id,
116+
"execution_date": dag_run.execution_date.isoformat(),
117+
"start_date": dag_run.start_date.isoformat(),
118+
"state": dag_run.state,
119+
"tasks": [{"id": ti.task_id, "state": ti.state} for ti in dag_run.get_task_instances()],
120+
"progress": int(len([ti for ti in dag_run.get_task_instances(State.SUCCESS)]) / len(dag_run.get_task_instances()) * 100)
121+
}
123122
dag_runs.append(response_item)
124123
return {"dag_runs": dag_runs}
125124
except Exception as err:
@@ -128,7 +127,7 @@ def get_dag_runs(self, dag_id=None, run_id=None, execution_date=None, state=None
128127

129128

130129
def post_dag_runs(self, dag_id, run_id=None, conf=None):
131-
logging.debug(f"Call post_dag_runs with dag_id={dag_id}, run_id={run_id}, conf={conf}")
130+
logging.info(f"Call post_dag_runs with dag_id={dag_id}, run_id={run_id}, conf={conf}")
132131
try:
133132
dagrun = self.trigger_dag(dag_id, run_id, conf)
134133
return {"dag_id": dagrun.dag_id,
@@ -141,9 +140,20 @@ def post_dag_runs(self, dag_id, run_id=None, conf=None):
141140
return connexion.problem(500, "Failed to create dag_run", str(err))
142141

143142

143+
def post_dags_dag_runs(self, dag_id, run_id, conf=None):
144+
logging.info(f"Call post_dags_dag_runs with dag_id={dag_id}, run_id={run_id}, conf={conf}")
145+
self.post_dags(dag_id)
146+
clean_up_dag_run(
147+
dag_id=dag_id,
148+
run_id=run_id,
149+
kill_timeout=3 # use shorter timeout for killing runnign tasks
150+
) # should wait untill it finish running, as we can't trigger the same DAG with the same run_id
151+
return self.post_dag_runs(dag_id, run_id, conf)
152+
153+
144154
def post_dag_runs_legacy(self, dag_id):
145155
data = connexion.request.json
146-
logging.debug(f"Call post_dag_runs_legacy with dag_id={dag_id}, data={data}")
156+
logging.info(f"Call post_dag_runs_legacy with dag_id={dag_id}, data={data}")
147157
return self.post_dag_runs(dag_id, data["run_id"], data["conf"])
148158

149159

@@ -223,18 +233,6 @@ def task_state(self, dag_id, task_id, execution_date):
223233
return task_state
224234

225235

226-
def list_dag_runs(self, dag_id, state):
227-
dag_runs = []
228-
for dag_run in DagRun.find(dag_id=dag_id, state=state):
229-
dag_runs.append({
230-
"run_id": dag_run.run_id,
231-
"state": dag_run.state,
232-
"execution_date": dag_run.execution_date.isoformat(),
233-
"start_date": ((dag_run.start_date or '') and dag_run.start_date.isoformat())
234-
})
235-
return dag_runs
236-
237-
238236
def save_attachment(self, attachment, location, exist_ok=False):
239237
if path.isfile(location) and not exist_ok:
240238
raise FileExistsError(f"File {location} already exist")

cwl_airflow/components/api/openapi/swagger_configuration.yaml

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
swagger: "2.0"
22
info:
33
title: CWL-Airflow API
4-
version: 1.0.0
4+
version: 1.0.1
55
basePath: "/api/experimental"
66
schemes:
77
- https
@@ -53,7 +53,7 @@ paths:
5353
post:
5454
summary: Creates new dag with dag_id from the attached workflow.cwl file or its compressed content.
5555
description: Creates new dag with dag_id from the attached workflow.cwl file or its compressed content.
56-
operationId: post_dag
56+
operationId: post_dags
5757
responses:
5858
"200":
5959
description: dag_id, py and cwl file locations of a created dag.
@@ -194,6 +194,70 @@ paths:
194194
tags:
195195
- Airflow
196196

197+
/dags/dag_runs:
198+
post:
199+
summary: Combined logic from /dags and /dag_runs POSTs
200+
description: >-
201+
1. Creates new dag with dag_id from the attached workflow.cwl file or its compressed content.
202+
Either workflow or workflow_content should be provided.
203+
dag_id should follow the naming rule "cwlid-commitsha", otherwise outdated dags won't be deleted.
204+
2. Tries to delete all previous dag_runs for the provided dag_id and run_id, which also includes
205+
- stopping all running tasks for the current dag_id and run_id
206+
- removing correspondent temporary data
207+
- cleaning up correspondent records in DB
208+
- removing outdated dags for the same cwlid if no running dag_runs were found for them
209+
3. Creates new dag_run for dag_id with run_id and optional conf
210+
operationId: post_dags_dag_runs
211+
responses:
212+
"200":
213+
description: Reference information about created dag and dag_run.
214+
schema:
215+
$ref: "#/definitions/PostDagRunsResponse"
216+
"400":
217+
description: The request is malformed.
218+
schema:
219+
$ref: "#/definitions/ErrorResponse"
220+
"401":
221+
description: The request is unauthorized.
222+
schema:
223+
$ref: "#/definitions/ErrorResponse"
224+
"403":
225+
description: The requester is not authorized to perform this action.
226+
schema:
227+
$ref: "#/definitions/ErrorResponse"
228+
"500":
229+
description: An unexpected error occurred.
230+
schema:
231+
$ref: "#/definitions/ErrorResponse"
232+
parameters:
233+
- name: dag_id
234+
description: Dag identifier, follow the naming rule "cwlid-commitsha"
235+
in: query
236+
required: true
237+
type: string
238+
- name: run_id
239+
description: Run identifier
240+
in: query
241+
required: true
242+
type: string
243+
- name: conf
244+
description: Run configuration (JSON-formatted string)
245+
in: query
246+
required: false
247+
type: string
248+
- name: workflow
249+
description: CWL workflow file with embedded tools and all other dependencies
250+
in: formData
251+
required: false
252+
type: file
253+
- name: workflow_content
254+
description: base64 encoded zlib compressed workflow content
255+
in: formData
256+
required: false
257+
type: string
258+
tags:
259+
- Airflow
260+
197261
# delete this path if not actively used
198262
/dags/{dag_id}/dag_runs:
199263
post:
@@ -610,6 +674,7 @@ definitions:
610674
- start_date
611675
- state
612676
- tasks
677+
- progress
613678
properties:
614679
dag_id:
615680
type: string
@@ -632,6 +697,8 @@ definitions:
632697
type: string
633698
state:
634699
$ref: "#/definitions/TaskState"
700+
progress:
701+
type: integer
635702
description: Dag run info
636703

637704
DagRunState:

cwl_airflow/components/test/conformance.py

Lines changed: 52 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ def create_dags(suite_data, args, dags_folder=None):
199199
# TODO: Do we need to force scheduler to reload DAGs after all DAG added?
200200

201201
for test_data in suite_data.values():
202+
params = {"dag_id": test_data["dag_id"]}
202203
workflow_path = os.path.join(
203204
args.tmp,
204205
os.path.basename(test_data["tool"])
@@ -214,18 +215,14 @@ def create_dags(suite_data, args, dags_folder=None):
214215
logging.info(f"Sending base64 encoded zlib compressed content from {workflow_path}")
215216
r = requests.post(
216217
url=urljoin(args.api, "/api/experimental/dags"),
217-
params={
218-
"dag_id": test_data["dag_id"]
219-
},
218+
params=params,
220219
json={"workflow_content": get_compressed(input_stream)}
221220
)
222221
else: # attach workflow as a file
223222
logging.info(f"Attaching workflow file {workflow_path}")
224223
r = requests.post(
225224
url=urljoin(args.api, "/api/experimental/dags"),
226-
params={
227-
"dag_id": test_data["dag_id"]
228-
},
225+
params=params,
229226
files={"workflow": input_stream}
230227
)
231228

@@ -241,23 +238,52 @@ def create_dags(suite_data, args, dags_folder=None):
241238
def trigger_dags(suite_data, args):
242239
"""
243240
Triggers all DAGs from "suite_data". If failed to trigger DAG, updates
244-
"suite_data" with "error" and sets "finished" to True
241+
"suite_data" with "error" and sets "finished" to True. In case --combine
242+
was set, we will call API that will first create the new DAG, then clean
243+
all previous DAG runs based on the provided run_id and dag_id, then remove
244+
outdated DAGs for the same workflow (for that dag_id should follow naming
245+
rule cwlid-commitsha) and only after that trigger the workflow execution.
246+
If not only --combine but also --embed was provided, send base64 encoded
247+
zlib compressed content of the workflow file instead of attaching it.
245248
"""
246249

247250
for run_id, test_data in suite_data.items():
248-
logging.info(f"Trigger DAG {test_data['dag_id']} from test case {test_data['index']} as {run_id}")
249-
r = requests.post(
250-
url=urljoin(args.api, "/api/experimental/dag_runs"),
251-
params={
252-
"run_id": run_id,
253-
"dag_id": test_data["dag_id"],
254-
"conf": json.dumps(
255-
{
256-
"job": test_data["job"]
257-
}
258-
)
259-
}
260-
)
251+
params = {
252+
"run_id": run_id,
253+
"dag_id": test_data["dag_id"],
254+
"conf": json.dumps({"job": test_data["job"]})
255+
}
256+
if args.combine: # use API endpoint that combines both creating, cleaning and triggerring new DAGs
257+
logging.info(f"Add and trigger DAG {test_data['dag_id']} from test case {test_data['index']} as {run_id}")
258+
workflow_path = os.path.join(
259+
args.tmp,
260+
os.path.basename(test_data["tool"])
261+
)
262+
embed_all_runs( # will save results to "workflow_path"
263+
workflow_tool=fast_cwl_load(test_data["tool"]),
264+
location=workflow_path
265+
)
266+
with open(workflow_path, "rb") as input_stream:
267+
if args.embed: # send base64 encoded zlib compressed workflow content that will be embedded into DAG python file
268+
logging.info(f"Sending base64 encoded zlib compressed content from {workflow_path}")
269+
r = requests.post(
270+
url=urljoin(args.api, "/api/experimental/dags/dag_runs"),
271+
params=params,
272+
json={"workflow_content": get_compressed(input_stream)}
273+
)
274+
else: # attach workflow as a file
275+
logging.info(f"Attaching workflow file {workflow_path}")
276+
r = requests.post(
277+
url=urljoin(args.api, "/api/experimental/dags/dag_runs"),
278+
params=params,
279+
files={"workflow": input_stream}
280+
)
281+
else:
282+
logging.info(f"Trigger DAG {test_data['dag_id']} from test case {test_data['index']} as {run_id}")
283+
r = requests.post(
284+
url=urljoin(args.api, "/api/experimental/dag_runs"),
285+
params=params
286+
)
261287
if not r.ok:
262288
reason = get_api_failure_reason(r)
263289
logging.error(f"Failed to trigger DAG {test_data['dag_id']} from test case {test_data['index']} as {run_id} due to {reason}")
@@ -287,8 +313,10 @@ def run_test_conformance(args):
287313
suite_data = load_test_suite(args)
288314
results_queue = queue.Queue(maxsize=len(suite_data))
289315

290-
# Create new dags
291-
create_dags(suite_data, args) # only reads from "suite_data"
316+
# Create new DAGs if --combine wasn't set and we want to use two
317+
# separate API calls for creating and trigerring DAGs
318+
if not args.combine:
319+
create_dags(suite_data, args) # only reads from "suite_data"
292320

293321
# Start thread to listen for status updates before
294322
# we trigger DAGs. "results_queue" is thread safe
@@ -301,7 +329,8 @@ def run_test_conformance(args):
301329

302330
# Trigger all dags updating "suite_data" items with "error" and "finished"=True
303331
# for all DAG runs that we failed to trigger. Writing to "suite_data" is not
304-
# thread safe!
332+
# thread safe! If --combine was set, this function will also create new DAGs
333+
# and clean old DAG runs
305334
trigger_dags(suite_data, args)
306335

307336
# Start checker thread to evaluate received results.

0 commit comments

Comments
 (0)