Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 131 additions & 2 deletions blueprints/airflow_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
import os
import six
import time
import logging

from flask import Blueprint, request, Response
from sqlalchemy import or_
from airflow import settings
from airflow.exceptions import AirflowException, AirflowConfigException
from airflow.models import DagBag, DagRun
from airflow.models import DagBag, DagRun, DagModel
from airflow.utils.state import State
from airflow.utils.dates import date_range as utils_date_range
from airflow.www.app import csrf
from airflow.bin.cli import pause, unpause, get_dag as cli_get_dag, connections as cli_connections

airflow_api_blueprint = Blueprint('airflow_api', __name__, url_prefix='/api/v1')

Expand Down Expand Up @@ -65,6 +67,14 @@ def unauthorized(error='Not authorized to access this resource'):
def server_error(error='An unexpected problem occurred'):
return ApiResponse.error(ApiResponse.STATUS_SERVER_ERROR, error)


class dagCliArgs:
def __init__(self, dag_id, subdir):
self.subdir = subdir
self.dag_id = dag_id



@airflow_api_blueprint.before_request
def verify_authentication():
authorization = request.headers.get('authorization')
Expand Down Expand Up @@ -120,6 +130,55 @@ def dags_index():

return ApiResponse.success({'dags': dags})

@csrf.exempt
@airflow_api_blueprint.route('/dags/<dag_id>', methods=['PUT'])
def dag_update(dag_id):
args = dagCliArgs(dag_id, 'dags')
try:
dag = cli_get_dag(args)
except AirflowException:
return ApiResponse.not_found('Could not find a dag with ID {}'.format(dag_id))
logging.info("Processing dag {} PUT body {}".format(dag_id, request.get_json()))
body = request.get_json()
if body is None or 'is_active' not in body:
return ApiResponse.bad_request("A Json body with 'is_active': True/False is expected")

try:
if body['is_active']:
unpause(None, dag)
elif not body['is_active']:
pause(None, dag)
except AirflowException:
return ApiResponse.not_found('Could not pause/unpause dag with ID {}'.format(dag_id))

payload = {
'dag_id': dag_id,
'full_path': dag.full_filepath,
'is_active': (not dag.is_paused),
'last_execution': str(dag.latest_execution_date)
}

return ApiResponse.success(payload)


@airflow_api_blueprint.route('/dags/<dag_id>', methods=['GET'])
def get_dag(dag_id):
args = dagCliArgs(dag_id, 'dags')

try:
dag = cli_get_dag(args)
except AirflowException:
return ApiResponse.not_found('Could not find a dag with ID {}'.format(dag_id))

payload = {
'dag_id': dag_id,
'full_path': dag.full_filepath,
'is_active': (not dag.is_paused),
'last_execution': str(dag.latest_execution_date)
}

return ApiResponse.success(payload)


@airflow_api_blueprint.route('/dag_runs', methods=['GET'])
def get_dag_runs():
Expand Down Expand Up @@ -148,6 +207,7 @@ def get_dag_runs():

return ApiResponse.success({'dag_runs': dag_runs})


@csrf.exempt
@airflow_api_blueprint.route('/dag_runs', methods=['POST'])
def create_dag_run():
Expand Down Expand Up @@ -284,4 +344,73 @@ def get_dag_run(dag_run_id):

session.close()

return ApiResponse.success({'dag_run': format_dag_run(dag_run)})
return ApiResponse.success({'dag_run': format_dag_run(dag_run)})

class connectionCliArgs:
def __init__(self, mode, conn_id=None, conn_uri=None ):

self.list = False
self.delete = False
self.add = False

if mode == 'list':
self.list = True
elif mode == 'delete':
self.delete = True
elif mode == 'add':
self.add = True

self.conn_id = conn_id
self.conn_uri = conn_uri
self.conn_extra = None
self.conn_type = None
self.conn_host = None
self.conn_login = None
self.conn_password = None
self.conn_schema = None
self.conn_port = None

@csrf.exempt
@airflow_api_blueprint.route('/connections/<conn_id>', methods=['DELETE'])
def delete_connections(conn_id):
args = connectionCliArgs('delete',conn_id=conn_id)

try:
cli_connections(args)
except AirflowException:
return ApiResponse.error('Could not delete the connection')

payload = {
'status': 'deleted'
}

return ApiResponse.success(payload)

@csrf.exempt
@airflow_api_blueprint.route('/connections', methods=['POST'])
def add_connections():

# decode input
data = request.get_json(force=True)
# ensure there is a conn_id
if 'conn_id' not in data or data['conn_id'] is None:
return ApiResponse.bad_request('Must specify the connection id (conn_id) for the new connection')
conn_id = data['conn_id']

# ensure there is a dag id
if 'conn_uri' not in data or data['conn_uri'] is None:
return ApiResponse.bad_request('Must specify the connection uri (conn_uri) for the new connection')
conn_uri = data['conn_uri']

args = connectionCliArgs('add',conn_id=conn_id, conn_uri=conn_uri)

try:
cli_connections(args)
except AirflowException:
return ApiResponse.error('Could not add the new connection ')

payload = {
'status': 'created'
}

return ApiResponse.success(payload)