|
| 1 | +"""ATT Parser.""" |
| 2 | + |
| 3 | +import logging |
| 4 | +import re |
| 5 | +import string |
| 6 | +from typing import Dict, List |
| 7 | + |
| 8 | +import dateutil |
| 9 | +from bs4.element import ResultSet # type: ignore |
| 10 | +from circuit_maintenance_parser.errors import ParserError |
| 11 | +from circuit_maintenance_parser.parser import CircuitImpact, Html, Impact, Status, Xlsx |
| 12 | + |
| 13 | +logger = logging.getLogger(__name__) |
| 14 | + |
| 15 | +RE_EVENT = re.compile( |
| 16 | + r"Event ID: (.*)[ \n]" |
| 17 | + r"Customer Impact Description: (.*)[ \n]" |
| 18 | + r"Summary: (.*)[ \n]" |
| 19 | + r"Description: (.*)[ \n]" |
| 20 | + r"Business Risk: (.*)" |
| 21 | +) |
| 22 | +RE_MAINTENANCE_WINDOW_GMT = re.compile(r"Start Time: (.* GMT).*End Time: (.* GMT)") |
| 23 | +RE_MAINTENANCE_WINDOW_NO_TIMEZONE = re.compile(r"Start Time: (.*)[ \n]End Time: (.*)") |
| 24 | + |
| 25 | + |
| 26 | +class XlsxParserATT1(Xlsx): |
| 27 | + """Xlsx Parser for ATT file attachments.""" |
| 28 | + |
| 29 | + @staticmethod |
| 30 | + def parse_xlsx(records: List[Dict]) -> List[Dict]: |
| 31 | + """Parses ATT xlsx attachments.""" |
| 32 | + impact = Impact.OUTAGE |
| 33 | + account_name, circuit_id_key = get_account_and_circuit_id_key(records[0]) |
| 34 | + circuit_ids = [r[circuit_id_key] for r in records] |
| 35 | + if "Circuit/Asset" in records[0]: |
| 36 | + circuit_ids = [normalize_lec_circuit_id(cid) for cid in circuit_ids] |
| 37 | + circuits = [CircuitImpact(impact=impact, circuit_id=cid) for cid in circuit_ids] |
| 38 | + data = [ |
| 39 | + { |
| 40 | + "account": account_name, |
| 41 | + "circuits": circuits, |
| 42 | + } |
| 43 | + ] |
| 44 | + return data |
| 45 | + |
| 46 | + |
| 47 | +class HtmlParserATT1(Html): |
| 48 | + """Notifications Parser for ATT notifications.""" |
| 49 | + |
| 50 | + def parse_html(self, soup): |
| 51 | + """Parse ATT HTML notification.""" |
| 52 | + logger.debug("Parsing ATT HTML notification.") |
| 53 | + data = self.parse_p_tags(soup) |
| 54 | + data["start"] = self.dt2ts(data["start"]) |
| 55 | + data["end"] = self.dt2ts(data["end"]) |
| 56 | + data["status"] = Status.CONFIRMED |
| 57 | + return [data] |
| 58 | + |
| 59 | + @staticmethod |
| 60 | + def parse_p_tags(soup: ResultSet) -> Dict: |
| 61 | + """Parse <p> tags in HTML.""" |
| 62 | + data = {} |
| 63 | + p_tags = soup.find_all("p") |
| 64 | + |
| 65 | + for tag in p_tags: |
| 66 | + text = remove_unprintable(tag.text.strip()) |
| 67 | + |
| 68 | + if match := RE_EVENT.search(text): |
| 69 | + event_id, impact, summary, description, _ = match.groups() |
| 70 | + data["maintenance_id"] = event_id |
| 71 | + data["summary"] = f"{summary}: {impact} {description}" |
| 72 | + |
| 73 | + elif match := RE_MAINTENANCE_WINDOW_GMT.search(text): |
| 74 | + start_time_text, end_time_text = match.groups() |
| 75 | + data["start"] = dateutil.parser.parse(start_time_text) |
| 76 | + data["end"] = dateutil.parser.parse(end_time_text) |
| 77 | + |
| 78 | + elif match := RE_MAINTENANCE_WINDOW_NO_TIMEZONE.search(text): |
| 79 | + start_time_text, end_time_text = match.groups() |
| 80 | + data["start"] = dateutil.parser.parse(start_time_text + " GMT") |
| 81 | + data["end"] = dateutil.parser.parse(end_time_text + " GMT") |
| 82 | + |
| 83 | + return data |
| 84 | + |
| 85 | + |
| 86 | +def get_account_and_circuit_id_key(record: Dict) -> tuple[str, str]: |
| 87 | + """Return the account name and the key used to retrieve circuits IDs. |
| 88 | +
|
| 89 | + The key names may vary depending on the ATT business unit that initiated the notice. |
| 90 | + """ |
| 91 | + if account := record.get("Customer"): |
| 92 | + circuit_id_key = "Circuit/Asset" |
| 93 | + elif account := record.get("Customer Name"): |
| 94 | + circuit_id_key = "Circuit ID" |
| 95 | + elif account := record.get("Customer Names"): |
| 96 | + circuit_id_key = "Customer Circuit ID" |
| 97 | + else: |
| 98 | + raise ParserError("Could not parse 'Customer Name' and 'Circuit ID'.") |
| 99 | + |
| 100 | + return str(account), circuit_id_key |
| 101 | + |
| 102 | + |
| 103 | +def normalize_lec_circuit_id(circuit_id: str) -> str: |
| 104 | + """Standardize circuit IDs.""" |
| 105 | + circuit_id, *_ = circuit_id.split() |
| 106 | + circuit_id = re.sub(r"^0+", "", circuit_id) # Remove leading zeros. |
| 107 | + circuit_id = re.sub(r"0+$", "ATI", circuit_id) # Remove trailing zeros. |
| 108 | + return circuit_id |
| 109 | + |
| 110 | + |
| 111 | +def remove_unprintable(text: str) -> str: |
| 112 | + """Remove non-printing characters from text.""" |
| 113 | + return "".join(c for c in text if c in string.printable) |
0 commit comments