|
| 1 | +from kafka import KafkaConsumer |
| 2 | +import json |
| 3 | +import requests |
| 4 | +import logging |
| 5 | +from defusedxml.minidom import parseString |
| 6 | + |
| 7 | + |
| 8 | +def transform(argo_event, environment): |
| 9 | + """Transform an argo status event to an alerta alert |
| 10 | +
|
| 11 | + Args: |
| 12 | + argo_event: obj. Json representation of an argo status event |
| 13 | + environment: str. Alerta enviroment parameter to build the alert |
| 14 | +
|
| 15 | + Return: |
| 16 | + obj: Json representation of an alerta alert |
| 17 | + """ |
| 18 | + status = argo_event["status"].lower() |
| 19 | + hostname = argo_event["hostname"] |
| 20 | + metric = argo_event["metric"] |
| 21 | + group = argo_event["endpoint_group"] |
| 22 | + etype = argo_event["type"] |
| 23 | + service = argo_event["service"] |
| 24 | + |
| 25 | + # alerta vars |
| 26 | + resource = "" |
| 27 | + event = "status" |
| 28 | + alerta_service = [] |
| 29 | + |
| 30 | + if etype == "endpoint_group": |
| 31 | + alerta_service.append("endpoint_group") |
| 32 | + resource = group |
| 33 | + elif etype == "service": |
| 34 | + alerta_service.append("service") |
| 35 | + resource = group + "/" + service |
| 36 | + elif etype == "endpoint": |
| 37 | + alerta_service.append("endpoint") |
| 38 | + resource = group + "/" + service + "/" + hostname |
| 39 | + elif etype == "metric": |
| 40 | + alerta_service.append("metric") |
| 41 | + resource = group + "/" + service + "/" + hostname + "/" + metric |
| 42 | + # prepare alerta json |
| 43 | + alerta = {"environment": environment, "event": event, "resource": resource, |
| 44 | + "service": alerta_service, "severity": status} |
| 45 | + |
| 46 | + return alerta |
| 47 | + |
| 48 | + |
| 49 | +def read_and_send(message, environment, alerta_url, alerta_token): |
| 50 | + """Read an argo status event from kafka and send it to alerta |
| 51 | +
|
| 52 | + Args: |
| 53 | + message: str. Current message from kafka queue |
| 54 | + environment: str. Alerta environment to be used (e.g. 'Devel') |
| 55 | + alerta_url: str. Alerta api endpoint |
| 56 | + alerta_token: str. Alerta api access token |
| 57 | +
|
| 58 | + """ |
| 59 | + try: |
| 60 | + argo_event = json.loads(message.value) |
| 61 | + except ValueError: |
| 62 | + logging.warning("NOT JSON: " + message.value) |
| 63 | + return |
| 64 | + |
| 65 | + try: |
| 66 | + alerta = transform(argo_event, environment) |
| 67 | + except KeyError: |
| 68 | + logging.warning("WRONG JSON SCHEMA: " + message.value) |
| 69 | + return |
| 70 | + |
| 71 | + logging.info("Attempting to send alert:" + json.dumps(alerta)) |
| 72 | + headers = {'Authorization': 'Key ' + alerta_token, |
| 73 | + 'Content-Type': 'application/json'} |
| 74 | + |
| 75 | + r = requests.post(alerta_url + "/alert", headers=headers, |
| 76 | + data=json.dumps(alerta)) |
| 77 | + |
| 78 | + if r.status_code == 201: |
| 79 | + logging.info("Alert send to alerta successfully") |
| 80 | + else: |
| 81 | + logging.warning("Alert wasn't send to alerta") |
| 82 | + logging.warning(r.text) |
| 83 | + |
| 84 | + |
| 85 | +def start_listening(environment, kafka_endpoints, kafka_topic, |
| 86 | + alerta_endpoint, alerta_token): |
| 87 | + """Start listening to a kafka topic and send alerts to an alerta endpoint |
| 88 | +
|
| 89 | + Args: |
| 90 | + environment: str. Alerta environment to be used (e.g. 'Devel') |
| 91 | + kafka_endpoints: str. kafka broker endpoint |
| 92 | + kafka_topic: str. kafka topic to listen t |
| 93 | + alerta_endpoint: str. Alerta api endpoint |
| 94 | + alerta_token: str. Alerta api access token |
| 95 | +
|
| 96 | + """ |
| 97 | + |
| 98 | + # Initialize kafka |
| 99 | + kafka_list = kafka_endpoints.split(',') |
| 100 | + |
| 101 | + consumer = KafkaConsumer(kafka_topic, |
| 102 | + group_id='argo-alerta', |
| 103 | + bootstrap_servers=kafka_list) |
| 104 | + for message in consumer: |
| 105 | + read_and_send(message, environment, alerta_endpoint, alerta_token) |
| 106 | + |
| 107 | + |
| 108 | +def gocdb_to_contacts(gocdb_xml, use_notif_flag, test_emails): |
| 109 | + """Transform gocdb xml schema info on generic contacts json information |
| 110 | +
|
| 111 | + Args: |
| 112 | + gocdb_xml: str. Data in gocdb xml format |
| 113 | + use_notif_flag: boolean. Examine or not notifications flag when gathering contacts |
| 114 | +
|
| 115 | + Return: |
| 116 | + obj: Json representation of contact information |
| 117 | + """ |
| 118 | + xmldoc = parseString(gocdb_xml) |
| 119 | + contacts = [] |
| 120 | + clist = xmldoc.getElementsByTagName("CONTACT_EMAIL") |
| 121 | + |
| 122 | + indx = 0 |
| 123 | + for item in clist: |
| 124 | + |
| 125 | + # By default accept all contacts |
| 126 | + notify_val = 'Y' |
| 127 | + # If flag on accept only contacts with notification flag |
| 128 | + if use_notif_flag: |
| 129 | + notify = item.parentNode.getElementsByTagName('NOTIFICATIONS')[0] |
| 130 | + # if notification flag is set to false skip |
| 131 | + notify_val = notify.firstChild.nodeValue |
| 132 | + |
| 133 | + if notify_val == 'TRUE' or notify_val == 'Y': |
| 134 | + c = dict() |
| 135 | + c["type"] = item.parentNode.tagName |
| 136 | + |
| 137 | + # Check if name tag exists |
| 138 | + name_tags = item.parentNode.getElementsByTagName("NAME") |
| 139 | + # if not check short name tag |
| 140 | + if len(name_tags) == 0: |
| 141 | + name_tags = item.parentNode.getElementsByTagName("SHORT_NAME") |
| 142 | + |
| 143 | + # if still no name related tag skip |
| 144 | + if len(name_tags) == 0: |
| 145 | + continue |
| 146 | + |
| 147 | + c["name"] = name_tags[0].firstChild.nodeValue |
| 148 | + |
| 149 | + if test_emails is None: |
| 150 | + c["email"] = item.firstChild.nodeValue |
| 151 | + else: |
| 152 | + c["email"] = test_emails[indx % len(test_emails)] |
| 153 | + indx = indx + 1 |
| 154 | + |
| 155 | + contacts.append(c) |
| 156 | + |
| 157 | + return contacts |
| 158 | + |
| 159 | + |
| 160 | +def contacts_to_alerta(contacts): |
| 161 | + """Transform a contacts json object to alerta's rule json object |
| 162 | +
|
| 163 | + Args: |
| 164 | + contacts: obj. Json representation of contact information |
| 165 | +
|
| 166 | + Return: |
| 167 | + obj: Json representation of alerta mailer rules |
| 168 | + """ |
| 169 | + rules = [] |
| 170 | + for c in contacts: |
| 171 | + |
| 172 | + rule_name = "rule_" + c["name"] |
| 173 | + rule_fields = [{u"field": u"resource", u"regex": c["name"]}] |
| 174 | + rule_contacts = [c["email"]] |
| 175 | + rule_exlude = True |
| 176 | + rule = {u"name": rule_name, u"fields": rule_fields, u"contacts": rule_contacts, u"exclude": rule_exlude} |
| 177 | + rules.append(rule) |
| 178 | + |
| 179 | + logging.info("Generated " + str(len(rules)) + " alerta rules from contact information") |
| 180 | + return rules |
| 181 | + |
| 182 | + |
| 183 | +def get_gocdb(api_url, ca_bundle, hostcert, hostkey, verify): |
| 184 | + """Http Rest call to gocdb-api to get xml contact information |
| 185 | +
|
| 186 | + Args: |
| 187 | + api_url: str. Gocdb url call |
| 188 | + ca_bundle: str. CA bundle file |
| 189 | + hostcert: str. Host certificate file |
| 190 | + hostkey: str. Host key file |
| 191 | + verify: str. path to a ca_bundle for verification - if available |
| 192 | +
|
| 193 | + Return: |
| 194 | + str: gocdb-api xml response |
| 195 | + """ |
| 196 | + |
| 197 | + # If verify is true replace it with ca_bundle path |
| 198 | + if verify: |
| 199 | + verify = ca_bundle |
| 200 | + |
| 201 | + logging.info("Requesting data from gocdb api: " + api_url) |
| 202 | + r = requests.get(api_url, cert=(hostcert, hostkey), verify=verify) |
| 203 | + |
| 204 | + if r.status_code == 200: |
| 205 | + logging.info("Gocdb data retrieval successful") |
| 206 | + return r.text.encode('utf-8').strip() |
| 207 | + |
| 208 | + return "" |
| 209 | + |
| 210 | + |
| 211 | +def write_rules(rules, outfile): |
| 212 | + """Writes alerta email rules to a specific output file |
| 213 | +
|
| 214 | + Args: |
| 215 | + rules: obj. json representation of alerta rules |
| 216 | + outfile: str. output filename path |
| 217 | + """ |
| 218 | + |
| 219 | + json_str = json.dumps(rules, indent=4) |
| 220 | + logging.info("Saving rule to file: " + outfile) |
| 221 | + with open(outfile, "w") as output_file: |
| 222 | + output_file.write(json_str) |
0 commit comments