From 23703c1bb59ed5a3f9b1df7703196eb60d4f7c76 Mon Sep 17 00:00:00 2001 From: Shreya Chakraborty Date: Fri, 11 Oct 2019 21:50:10 +0200 Subject: [PATCH] Add sensor to check the status of redshift cluster This is used for checking when we resize a cluster or newly create it from snapshots, and hence wait until the cluster is available for us to run queries on it --- operators/redshift_cluster_sensor_plugin.py | 71 +++++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 operators/redshift_cluster_sensor_plugin.py diff --git a/operators/redshift_cluster_sensor_plugin.py b/operators/redshift_cluster_sensor_plugin.py new file mode 100644 index 0000000..4743575 --- /dev/null +++ b/operators/redshift_cluster_sensor_plugin.py @@ -0,0 +1,71 @@ +import logging + +import boto3 +from airflow.operators.sensors import BaseSensorOperator +from airflow.plugins_manager import AirflowPlugin +from airflow.utils.decorators import apply_defaults +from airflow.hooks.base_hook import BaseHook + + +class RedshiftClusterStatusSensor(BaseSensorOperator): + """ + Check if the cluster is in `available` status to be able to run queries + :param cluster_id: The redshift cluster id. + :type cluster_id: string + :param region_name: The AWS region in which the cluster resides. + :type redshift_schema: string + :param s3_conn_id: The source s3 connection id. + :type s3_conn_id: string + """ + + template_fields = () + template_ext = () + ui_color = '#daf7a6' + + @apply_defaults + def __init__(self, + cluster_id, + region_name='eu-central-1', + aws_conn_id=None, + *args, **kwargs): + self.cluster_id = cluster_id + self.region_name = region_name + self.aws_conn_id = aws_conn_id + super(RedshiftClusterStatusSensor, self).__init__(*args, **kwargs) + + def poke(self, context): + conn = BaseHook.get_connection(self.aws_conn_id) + redshift_client = boto3.client('redshift', + region_name=self.region_name, + aws_access_key_id=conn.login, + aws_secret_access_key=conn.password) + response = redshift_client.describe_clusters( + ClusterIdentifier=self.cluster_id) + + status = response['Clusters'][0]['ClusterStatus'] + + if status != 'available': + message = ("The redshift cluster {cluster_id} " + "is not available for running queries!!") + logging.info(message.format(cluster_id=self.cluster_id)) + return False + + return True + + +class RedshiftClusterStatusPlugin(AirflowPlugin): + name = "redshift_cluster_status_sensor" + operators = [RedshiftClusterStatusSensor] + # A list of class(es) derived from BaseHook + hooks = [] + # A list of class(es) derived from BaseExecutor + executors = [] + # A list of references to inject into the macros namespace + macros = [] + # A list of objects created from a class derived + # from flask_admin.BaseView + admin_views = [] + # A list of Blueprint object created from flask.Blueprint + flask_blueprints = [] + # A list of menu links (flask_admin.base.MenuLink) + menu_links = [] \ No newline at end of file