Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 47 additions & 4 deletions blueprints/airflow_api.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
from datetime import datetime
import json
import os
import six
import time

from flask import Blueprint, request, Response
from flask import Blueprint, request, Response, jsonify, url_for
from sqlalchemy import or_
from airflow import settings
from airflow.exceptions import AirflowException, AirflowConfigException
from airflow.models import DagBag, DagRun
from airflow.models import DagBag, DagRun, DagModel
from airflow.utils.state import State
from airflow.utils.dates import date_range as utils_date_range
from airflow.www.app import csrf
Expand Down Expand Up @@ -65,6 +64,7 @@ def unauthorized(error='Not authorized to access this resource'):
def server_error(error='An unexpected problem occurred'):
return ApiResponse.error(ApiResponse.STATUS_SERVER_ERROR, error)


@airflow_api_blueprint.before_request
def verify_authentication():
authorization = request.headers.get('authorization')
Expand Down Expand Up @@ -121,6 +121,29 @@ def dags_index():
return ApiResponse.success({'dags': dags})


@csrf.exempt
@airflow_api_blueprint.route('/dags/<string:dag_id>/paused/<string:paused>',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it, we may want to change the route to be '/dags/string:dag_id', have the call expect a POST'ed JSON object that currently only expect { "paused": true/false }, and then change the method from POST to PATCH. That way we can extend this later for any other update operations on a DAG. Maybe change the function def name to be "update_dag".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about a POST to /dags with all the params in the object, passing in the following?
{ "dag_id":"dag1", "paused": true/false }

Then you could bulk update in one request:
{ "dag_id":"dag1,dag2,dag3", "paused": true/false }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in general for the case of multiple records being updated in the same call we can add a secondary URL. ie, have both /dags and /dags/[dag_id]. However looking at the schema, since pretty much the only property of a dag that should be manually changed is the "is_paused" property, this should be fine. However let's make the JSON be:
{ "dag_id": ["dag1","dag2","dag3"], "paused": true/false }, and make it a PUT. Try to keep things RESTful, reserve POST for just creation of records.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cwurtz Got it.

A thought though: Should you be able to update multiple records with varying values? So set dag1 and dag2 to true but dag3 to false?

In that case, the above wouldn't work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you think there is a use case for it. I'd imagine the vast majority of the time if someone is updating multiple dags, it would be to set them all as true or all as false. Worst case they have to make two API calls.

If you disagree and think it would be more common, we could do something like:

[
  { "dag_id": "dag1", "paused": true},
  { "dag_id": "dag2", "paused": false }
]

And optionally allow just { "dag_id": "dag1", "paused": false } for updating a single dag (check the input at the start of the call and coerce it into an array to keep the logic consistent)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that might be a bit much for now. I'll update the PR to include PUT calls and to accept multiple dag_ids but that's where I'll leave it.

methods=['POST'])
def dag_paused(dag_id, paused):
"""(Un)pauses a dag"""

session = settings.Session()
orm_dag = (
session.query(DagModel)
.filter(DagModel.dag_id == dag_id).first()
)
if paused == 'true':
orm_dag.is_paused = True
else:
orm_dag.is_paused = False

session.merge(orm_dag)
session.commit()
session.close()

return ApiResponse.success({dag_id: paused})


@airflow_api_blueprint.route('/dag_runs', methods=['GET'])
def get_dag_runs():
dag_runs = []
Expand Down Expand Up @@ -148,6 +171,7 @@ def get_dag_runs():

return ApiResponse.success({'dag_runs': dag_runs})


@csrf.exempt
@airflow_api_blueprint.route('/dag_runs', methods=['POST'])
def create_dag_run():
Expand Down Expand Up @@ -284,4 +308,23 @@ def get_dag_run(dag_run_id):

session.close()

return ApiResponse.success({'dag_run': format_dag_run(dag_run)})
return ApiResponse.success({'dag_run': format_dag_run(dag_run)})


@airflow_api_blueprint.route('/latest_runs', methods=['GET'])
def latest_dag_runs():
"""Returns the latest DagRun for each DAG formatted for the UI. """
dagruns = DagRun.get_latest_runs()
payload = []
for dagrun in dagruns:
if dagrun.execution_date:
payload.append({
'dag_id': dagrun.dag_id,
'execution_date': dagrun.execution_date.isoformat(),
'start_date': ((dagrun.start_date or '') and
dagrun.start_date.isoformat()),
'dag_run_url': url_for('airflow.graph',
dag_id=dagrun.dag_id,
execution_date=dagrun.execution_date)
})
return ApiResponse.success(items=payload)