From bf703c04f10042df786c8dcf3170d9638d688304 Mon Sep 17 00:00:00 2001 From: Benjamin Gregory Date: Tue, 29 May 2018 19:34:29 -0600 Subject: [PATCH] Add Dag Paused and Latest Dag Runs endpoint --- blueprints/airflow_api.py | 51 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 47 insertions(+), 4 deletions(-) diff --git a/blueprints/airflow_api.py b/blueprints/airflow_api.py index 2d1f0ab..ffd9ef3 100644 --- a/blueprints/airflow_api.py +++ b/blueprints/airflow_api.py @@ -1,14 +1,13 @@ from datetime import datetime import json -import os import six import time -from flask import Blueprint, request, Response +from flask import Blueprint, request, Response, jsonify, url_for from sqlalchemy import or_ from airflow import settings from airflow.exceptions import AirflowException, AirflowConfigException -from airflow.models import DagBag, DagRun +from airflow.models import DagBag, DagRun, DagModel from airflow.utils.state import State from airflow.utils.dates import date_range as utils_date_range from airflow.www.app import csrf @@ -65,6 +64,7 @@ def unauthorized(error='Not authorized to access this resource'): def server_error(error='An unexpected problem occurred'): return ApiResponse.error(ApiResponse.STATUS_SERVER_ERROR, error) + @airflow_api_blueprint.before_request def verify_authentication(): authorization = request.headers.get('authorization') @@ -121,6 +121,29 @@ def dags_index(): return ApiResponse.success({'dags': dags}) +@csrf.exempt +@airflow_api_blueprint.route('/dags//paused/', + methods=['POST']) +def dag_paused(dag_id, paused): + """(Un)pauses a dag""" + + session = settings.Session() + orm_dag = ( + session.query(DagModel) + .filter(DagModel.dag_id == dag_id).first() + ) + if paused == 'true': + orm_dag.is_paused = True + else: + orm_dag.is_paused = False + + session.merge(orm_dag) + session.commit() + session.close() + + return ApiResponse.success({dag_id: paused}) + + @airflow_api_blueprint.route('/dag_runs', methods=['GET']) def get_dag_runs(): dag_runs = [] @@ -148,6 +171,7 @@ def get_dag_runs(): return ApiResponse.success({'dag_runs': dag_runs}) + @csrf.exempt @airflow_api_blueprint.route('/dag_runs', methods=['POST']) def create_dag_run(): @@ -284,4 +308,23 @@ def get_dag_run(dag_run_id): session.close() - return ApiResponse.success({'dag_run': format_dag_run(dag_run)}) \ No newline at end of file + return ApiResponse.success({'dag_run': format_dag_run(dag_run)}) + + +@airflow_api_blueprint.route('/latest_runs', methods=['GET']) +def latest_dag_runs(): + """Returns the latest DagRun for each DAG formatted for the UI. """ + dagruns = DagRun.get_latest_runs() + payload = [] + for dagrun in dagruns: + if dagrun.execution_date: + payload.append({ + 'dag_id': dagrun.dag_id, + 'execution_date': dagrun.execution_date.isoformat(), + 'start_date': ((dagrun.start_date or '') and + dagrun.start_date.isoformat()), + 'dag_run_url': url_for('airflow.graph', + dag_id=dagrun.dag_id, + execution_date=dagrun.execution_date) + }) + return ApiResponse.success(items=payload)