diff --git a/blueprints/airflow_api.py b/blueprints/airflow_api.py index 2d1f0ab..01bee25 100644 --- a/blueprints/airflow_api.py +++ b/blueprints/airflow_api.py @@ -3,15 +3,17 @@ import os import six import time +import logging from flask import Blueprint, request, Response from sqlalchemy import or_ from airflow import settings from airflow.exceptions import AirflowException, AirflowConfigException -from airflow.models import DagBag, DagRun +from airflow.models import DagBag, DagRun, DagModel from airflow.utils.state import State from airflow.utils.dates import date_range as utils_date_range from airflow.www.app import csrf +from airflow.bin.cli import pause, unpause, get_dag as cli_get_dag, connections as cli_connections airflow_api_blueprint = Blueprint('airflow_api', __name__, url_prefix='/api/v1') @@ -65,6 +67,14 @@ def unauthorized(error='Not authorized to access this resource'): def server_error(error='An unexpected problem occurred'): return ApiResponse.error(ApiResponse.STATUS_SERVER_ERROR, error) + +class dagCliArgs: + def __init__(self, dag_id, subdir): + self.subdir = subdir + self.dag_id = dag_id + + + @airflow_api_blueprint.before_request def verify_authentication(): authorization = request.headers.get('authorization') @@ -120,6 +130,55 @@ def dags_index(): return ApiResponse.success({'dags': dags}) +@csrf.exempt +@airflow_api_blueprint.route('/dags/', methods=['PUT']) +def dag_update(dag_id): + args = dagCliArgs(dag_id, 'dags') + try: + dag = cli_get_dag(args) + except AirflowException: + return ApiResponse.not_found('Could not find a dag with ID {}'.format(dag_id)) + logging.info("Processing dag {} PUT body {}".format(dag_id, request.get_json())) + body = request.get_json() + if body is None or 'is_active' not in body: + return ApiResponse.bad_request("A Json body with 'is_active': True/False is expected") + + try: + if body['is_active']: + unpause(None, dag) + elif not body['is_active']: + pause(None, dag) + except AirflowException: + return ApiResponse.not_found('Could not pause/unpause dag with ID {}'.format(dag_id)) + + payload = { + 'dag_id': dag_id, + 'full_path': dag.full_filepath, + 'is_active': (not dag.is_paused), + 'last_execution': str(dag.latest_execution_date) + } + + return ApiResponse.success(payload) + + +@airflow_api_blueprint.route('/dags/', methods=['GET']) +def get_dag(dag_id): + args = dagCliArgs(dag_id, 'dags') + + try: + dag = cli_get_dag(args) + except AirflowException: + return ApiResponse.not_found('Could not find a dag with ID {}'.format(dag_id)) + + payload = { + 'dag_id': dag_id, + 'full_path': dag.full_filepath, + 'is_active': (not dag.is_paused), + 'last_execution': str(dag.latest_execution_date) + } + + return ApiResponse.success(payload) + @airflow_api_blueprint.route('/dag_runs', methods=['GET']) def get_dag_runs(): @@ -148,6 +207,7 @@ def get_dag_runs(): return ApiResponse.success({'dag_runs': dag_runs}) + @csrf.exempt @airflow_api_blueprint.route('/dag_runs', methods=['POST']) def create_dag_run(): @@ -284,4 +344,73 @@ def get_dag_run(dag_run_id): session.close() - return ApiResponse.success({'dag_run': format_dag_run(dag_run)}) \ No newline at end of file + return ApiResponse.success({'dag_run': format_dag_run(dag_run)}) + +class connectionCliArgs: + def __init__(self, mode, conn_id=None, conn_uri=None ): + + self.list = False + self.delete = False + self.add = False + + if mode == 'list': + self.list = True + elif mode == 'delete': + self.delete = True + elif mode == 'add': + self.add = True + + self.conn_id = conn_id + self.conn_uri = conn_uri + self.conn_extra = None + self.conn_type = None + self.conn_host = None + self.conn_login = None + self.conn_password = None + self.conn_schema = None + self.conn_port = None + +@csrf.exempt +@airflow_api_blueprint.route('/connections/', methods=['DELETE']) +def delete_connections(conn_id): + args = connectionCliArgs('delete',conn_id=conn_id) + + try: + cli_connections(args) + except AirflowException: + return ApiResponse.error('Could not delete the connection') + + payload = { + 'status': 'deleted' + } + + return ApiResponse.success(payload) + +@csrf.exempt +@airflow_api_blueprint.route('/connections', methods=['POST']) +def add_connections(): + + # decode input + data = request.get_json(force=True) + # ensure there is a conn_id + if 'conn_id' not in data or data['conn_id'] is None: + return ApiResponse.bad_request('Must specify the connection id (conn_id) for the new connection') + conn_id = data['conn_id'] + + # ensure there is a dag id + if 'conn_uri' not in data or data['conn_uri'] is None: + return ApiResponse.bad_request('Must specify the connection uri (conn_uri) for the new connection') + conn_uri = data['conn_uri'] + + args = connectionCliArgs('add',conn_id=conn_id, conn_uri=conn_uri) + + try: + cli_connections(args) + except AirflowException: + return ApiResponse.error('Could not add the new connection ') + + payload = { + 'status': 'created' + } + + return ApiResponse.success(payload)