|
| 1 | +import os |
| 2 | +from collections.abc import Generator |
| 3 | +from contextlib import contextmanager |
| 4 | +from tempfile import TemporaryDirectory |
| 5 | +from time import sleep |
| 6 | +from typing import NamedTuple |
| 7 | + |
| 8 | +import polars as pl |
| 9 | +from selenium import webdriver |
| 10 | +from selenium.webdriver.chrome.options import Options |
| 11 | +from selenium.webdriver.chrome.service import Service |
| 12 | +from selenium.webdriver.common.by import By |
| 13 | +from selenium.webdriver.support import expected_conditions as EC |
| 14 | +from selenium.webdriver.support.ui import Select, WebDriverWait |
| 15 | + |
| 16 | +DOWNLOAD_PATH = os.path.join(os.getcwd(), "outputs") |
| 17 | + |
| 18 | + |
| 19 | +class ScrapingContext(NamedTuple): |
| 20 | + driver: webdriver.Chrome |
| 21 | + download_path: str |
| 22 | + |
| 23 | + |
| 24 | +@contextmanager |
| 25 | +def create_context() -> Generator[ScrapingContext]: |
| 26 | + with TemporaryDirectory() as temp_dir: |
| 27 | + yield ScrapingContext(create_driver_(temp_dir), temp_dir) |
| 28 | + |
| 29 | + |
| 30 | +def create_driver_(download_path: str) -> webdriver.Chrome: |
| 31 | + options = Options() |
| 32 | + options.add_argument("--headless") |
| 33 | + options.add_argument("--no-sandbox") |
| 34 | + options.add_argument("--disable-dev-shm-usage") |
| 35 | + options.add_argument( |
| 36 | + "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36" |
| 37 | + ) |
| 38 | + options.add_experimental_option( |
| 39 | + "prefs", |
| 40 | + { |
| 41 | + "download.default_directory": download_path, |
| 42 | + "download.prompt_for_download": False, |
| 43 | + "directory_upgrade": True, |
| 44 | + }, |
| 45 | + ) |
| 46 | + service = Service(log_path=os.devnull) |
| 47 | + return webdriver.Chrome(options=options, service=service) |
| 48 | + |
| 49 | + |
| 50 | +def get_electric_tariffs(context: ScrapingContext, state: str, utility: str, schedule: str): |
| 51 | + driver, _ = context |
| 52 | + driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") |
| 53 | + driver.get("https://secure.rateacuity.com/RateAcuity/ElecEntry/IndexViews") |
| 54 | + WebDriverWait(driver, 10).until( |
| 55 | + EC.element_to_be_clickable((By.XPATH, "//input[@id='report' and @value='benchmark']")) |
| 56 | + ).click() |
| 57 | + |
| 58 | + select_state(driver, state) |
| 59 | + select_utility(driver, utility) |
| 60 | + select_schedule(driver, schedule) |
| 61 | + |
| 62 | + filepath = download_excel(context) |
| 63 | + |
| 64 | + result = read_excel(filepath) |
| 65 | + |
| 66 | + # remove downloaded excel file |
| 67 | + os.unlink(filepath) |
| 68 | + return result |
| 69 | + |
| 70 | + |
| 71 | +def read_excel(filepath: str): |
| 72 | + raw_data = pl.read_excel(filepath, engine="calamine", has_header=False) |
| 73 | + header_row_index = next(i for i, row in enumerate(raw_data.iter_rows()) if "Component Description" in row[0]) |
| 74 | + df = pl.read_excel(filepath, engine="calamine", read_options={"header_row": header_row_index}) |
| 75 | + df = df.with_columns( |
| 76 | + [ |
| 77 | + pl.when(pl.col(c).cast(pl.Utf8).str.strip_chars() == "").then(None).otherwise(pl.col(c)).alias(c) |
| 78 | + for c in df.columns |
| 79 | + ] |
| 80 | + ) |
| 81 | + df = df.filter(pl.col(df.columns[0]).is_not_null() & pl.col(df.columns[1]).is_not_null()) |
| 82 | + return df |
| 83 | + |
| 84 | + |
| 85 | +def download_excel(context: ScrapingContext) -> str: |
| 86 | + driver, download_path = context |
| 87 | + WebDriverWait(driver, 10).until( |
| 88 | + EC.presence_of_element_located((By.XPATH, '//a[text()="Create Excel Spreadsheet"]')) |
| 89 | + ).click() |
| 90 | + |
| 91 | + initial_state = _get_xlsx(download_path) |
| 92 | + |
| 93 | + n = 20 |
| 94 | + while _get_xlsx(download_path) == initial_state and n: |
| 95 | + sleep(1) |
| 96 | + n -= 1 |
| 97 | + |
| 98 | + filename = next(iter(_get_xlsx(download_path) ^ initial_state)) |
| 99 | + print("Filename:", filename) |
| 100 | + return os.path.join(download_path, filename) |
| 101 | + |
| 102 | + |
| 103 | +def _get_xlsx(folder) -> set[str]: |
| 104 | + return {_ for _ in os.listdir(folder) if _.endswith(".xlsx")} |
| 105 | + |
| 106 | + |
| 107 | +def select_state(driver: webdriver.Chrome, state: str): |
| 108 | + dropdown = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "StateSelect"))) |
| 109 | + options = dropdown.find_elements(By.TAG_NAME, "option") |
| 110 | + option_texts = [_.text.strip() for _ in options] |
| 111 | + if state not in option_texts: |
| 112 | + raise ValueError(f"State {state} is invalid. Available options are: {option_texts}") |
| 113 | + select = Select(dropdown) |
| 114 | + select.select_by_visible_text(state) |
| 115 | + |
| 116 | + |
| 117 | +def select_utility(driver: webdriver.Chrome, utility: str): |
| 118 | + dropdown = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "UtilitySelect"))) |
| 119 | + options = dropdown.find_elements(By.TAG_NAME, "option") |
| 120 | + option_texts = [_.text.strip() for _ in options] |
| 121 | + if utility not in option_texts: |
| 122 | + raise ValueError(f"Utility {utility} is invalid. Available options are: {option_texts}") |
| 123 | + select = Select(dropdown) |
| 124 | + select.select_by_visible_text(utility) |
| 125 | + |
| 126 | + |
| 127 | +def select_schedule(driver: webdriver.Chrome, schedule: str): |
| 128 | + dropdown = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "ScheduleSelect"))) |
| 129 | + options = dropdown.find_elements(By.TAG_NAME, "option") |
| 130 | + option_texts = [_.text.strip() for _ in options] |
| 131 | + if schedule not in option_texts: |
| 132 | + raise ValueError(f"Schedule {schedule} is invalid. Available options are: {option_texts}") |
| 133 | + select = Select(dropdown) |
| 134 | + select.select_by_visible_text(schedule) |
| 135 | + |
| 136 | + |
| 137 | +def login(driver: webdriver.Chrome, email_address: str, password: str): |
| 138 | + driver.get("https://secure.rateacuity.com/RateAcuityPortal/Account/Login") |
| 139 | + WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "UserName"))).send_keys(email_address) |
| 140 | + WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "Password"))).send_keys(password) |
| 141 | + WebDriverWait(driver, 10).until( |
| 142 | + EC.element_to_be_clickable((By.XPATH, "//input[@type='submit' and @value='Log in']")) |
| 143 | + ).click() |
0 commit comments