Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ Returns a list of dag runs, up to 100 per request. It can be filtered by a star
| state | Query | No | String | | Filter dag runs by a specified state |
| external_trigger | Query | No | Boolean | | Filter dag runs by whether or not they were triggered internally (ie by the scheduler) or externally (ie this API or the CLI |
| prefix | Query | No | String | | Filter dag runs to only the runs with a `run_id` containing the full prefix specified |
| dag_id | Query | No | String | | Filter dag runs to only the runs with `dag_id` |

**Success Response**

Expand Down
30 changes: 14 additions & 16 deletions blueprints/airflow_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from airflow.utils.state import State
from airflow.utils.dates import date_range as utils_date_range
from airflow.www.app import csrf
import urllib

airflow_api_blueprint = Blueprint('airflow_api', __name__, url_prefix='/api/v1')

Expand Down Expand Up @@ -77,6 +78,14 @@ def verify_authentication():
return ApiResponse.unauthorized("You are not authorized to use this resource")


def format_url(execution_date, dag_id):
encoded_execution_date = '+'.join(urllib.quote(execution_date).split('T'))
encoded_dag_id = urllib.quote(dag_id)
return '/admin/airflow/graph?execution_date={execution_date}&dag_id={dag_id}'.format(
dag_id=encoded_dag_id,
execution_date=encoded_execution_date)


def format_dag_run(dag_run):
return {
'run_id': dag_run.run_id,
Expand All @@ -85,7 +94,8 @@ def format_dag_run(dag_run):
'start_date': (None if not dag_run.start_date else str(dag_run.start_date)),
'end_date': (None if not dag_run.end_date else str(dag_run.end_date)),
'external_trigger': dag_run.external_trigger,
'execution_date': str(dag_run.execution_date)
'execution_date': str(dag_run.execution_date),
'dag_run_url': format_url(str(dag_run.execution_date), dag_run.dag_id)
}


Expand Down Expand Up @@ -139,6 +149,9 @@ def get_dag_runs():
if request.args.get('prefix') is not None:
query = query.filter(DagRun.run_id.ilike('{}%'.format(request.args.get('prefix'))))

if request.args.get('dag_id') is not None:
query = query.filter(DagRun.dag_id == request.args.get('dag_id'))

runs = query.order_by(DagRun.execution_date).all()

for run in runs:
Expand Down Expand Up @@ -270,18 +283,3 @@ def create_dag_run():

return ApiResponse.success({'dag_run_ids': results})


@airflow_api_blueprint.route('/dag_runs/<dag_run_id>', methods=['GET'])
def get_dag_run(dag_run_id):
session = settings.Session()

runs = DagRun.find(run_id=dag_run_id, session=session)

if len(runs) == 0:
return ApiResponse.not_found('Dag run not found')

dag_run = runs[0]

session.close()

return ApiResponse.success({'dag_run': format_dag_run(dag_run)})