diff --git a/README.md b/README.md index d789da7..077b9ec 100644 --- a/README.md +++ b/README.md @@ -133,6 +133,7 @@ Returns a list of dag runs, up to 100 per request. It can be filtered by a star | state | Query | No | String | | Filter dag runs by a specified state | | external_trigger | Query | No | Boolean | | Filter dag runs by whether or not they were triggered internally (ie by the scheduler) or externally (ie this API or the CLI | | prefix | Query | No | String | | Filter dag runs to only the runs with a `run_id` containing the full prefix specified | +| dag_id | Query | No | String | | Filter dag runs to only the runs with `dag_id` | **Success Response** diff --git a/blueprints/airflow_api.py b/blueprints/airflow_api.py index 2d1f0ab..7e44e70 100644 --- a/blueprints/airflow_api.py +++ b/blueprints/airflow_api.py @@ -12,6 +12,7 @@ from airflow.utils.state import State from airflow.utils.dates import date_range as utils_date_range from airflow.www.app import csrf +import urllib airflow_api_blueprint = Blueprint('airflow_api', __name__, url_prefix='/api/v1') @@ -77,6 +78,14 @@ def verify_authentication(): return ApiResponse.unauthorized("You are not authorized to use this resource") +def format_url(execution_date, dag_id): + encoded_execution_date = '+'.join(urllib.quote(execution_date).split('T')) + encoded_dag_id = urllib.quote(dag_id) + return '/admin/airflow/graph?execution_date={execution_date}&dag_id={dag_id}'.format( + dag_id=encoded_dag_id, + execution_date=encoded_execution_date) + + def format_dag_run(dag_run): return { 'run_id': dag_run.run_id, @@ -85,7 +94,8 @@ def format_dag_run(dag_run): 'start_date': (None if not dag_run.start_date else str(dag_run.start_date)), 'end_date': (None if not dag_run.end_date else str(dag_run.end_date)), 'external_trigger': dag_run.external_trigger, - 'execution_date': str(dag_run.execution_date) + 'execution_date': str(dag_run.execution_date), + 'dag_run_url': format_url(str(dag_run.execution_date), dag_run.dag_id) } @@ -139,6 +149,9 @@ def get_dag_runs(): if request.args.get('prefix') is not None: query = query.filter(DagRun.run_id.ilike('{}%'.format(request.args.get('prefix')))) + if request.args.get('dag_id') is not None: + query = query.filter(DagRun.dag_id == request.args.get('dag_id')) + runs = query.order_by(DagRun.execution_date).all() for run in runs: @@ -270,18 +283,3 @@ def create_dag_run(): return ApiResponse.success({'dag_run_ids': results}) - -@airflow_api_blueprint.route('/dag_runs/', methods=['GET']) -def get_dag_run(dag_run_id): - session = settings.Session() - - runs = DagRun.find(run_id=dag_run_id, session=session) - - if len(runs) == 0: - return ApiResponse.not_found('Dag run not found') - - dag_run = runs[0] - - session.close() - - return ApiResponse.success({'dag_run': format_dag_run(dag_run)}) \ No newline at end of file