|
1 | 1 | import copy |
2 | | -import os |
3 | | -import polars |
4 | 2 | from schemas import ( |
5 | 3 | facilities_schema, |
6 | 4 | resp_info_schema, |
7 | 5 | ) |
8 | 6 | import time |
9 | 7 | from urllib.parse import quote |
10 | 8 | from utils import ( |
11 | | - facility_sheet_header, |
12 | 9 | logger, |
13 | 10 | session, |
14 | 11 | ) |
15 | 12 | # ExternalDataEnricher class for enrichment logic |
16 | 13 |
|
17 | | -SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) |
18 | 14 | # Rate limiting for API calls |
19 | 15 | NOMINATIM_DELAY = 1.0 # 1 second between requests as per OSM policy |
20 | 16 | WIKIPEDIA_DELAY = 0.5 # Be respectful to Wikipedia |
|
23 | 19 |
|
24 | 20 | class ExternalDataEnricher(object): |
25 | 21 | def __init__(self): |
26 | | - self.sheet_url = "https://www.ice.gov/doclib/detention/FY25_detentionStats08292025.xlsx" |
27 | | - self.filename = f"{SCRIPT_DIR}{os.sep}detentionstats.xlsx" |
28 | | - self.adp_sheet_data = self._load_sheet() |
29 | | - |
30 | | - def _download_sheet(self) -> None: |
31 | | - if not os.path.isfile(self.filename) or os.path.getsize(self.filename) < 1: |
32 | | - logger.info("Downloading sheet from %s", self.sheet_url) |
33 | | - resp = session.get(self.sheet_url, timeout=120) |
34 | | - with open(self.filename, "wb") as f: |
35 | | - for chunk in resp.iter_content(chunk_size=1024): |
36 | | - if chunk: |
37 | | - f.write(chunk) |
38 | | - |
39 | | - def _load_sheet(self) -> dict: |
40 | | - """Convert the detentionstats sheet data into something we can update our facilities with""" |
41 | | - self._download_sheet() |
42 | | - df = polars.read_excel( |
43 | | - drop_empty_rows=True, |
44 | | - has_header=False, |
45 | | - # because we're manually defining the header... |
46 | | - read_options={"skip_rows": 7, "column_names": facility_sheet_header}, |
47 | | - sheet_name="Facilities FY25", |
48 | | - source=open(self.filename, "rb"), |
49 | | - ) |
50 | | - results: dict = {} |
51 | | - for row in df.iter_rows(named=True): |
52 | | - full_address = f"{row['Address']} {row['City']}, {row['State']} {row['Zip']}".upper() |
53 | | - results[full_address] = row |
54 | | - return results |
55 | | - |
56 | | - def _update_from_sheet(self, base: dict, row: dict) -> dict: |
57 | | - base["population"]["male"]["criminal"] = row["Male Crim"] |
58 | | - base["population"]["male"]["non_criminal"] = row["Male Non-Crim"] |
59 | | - base["population"]["female"]["criminal"] = row["Female Crim"] |
60 | | - base["population"]["female"]["non_criminal"] = row["Female Non-Crim"] |
61 | | - if "/" in row["Male/Female"]: |
62 | | - base["population"]["female"]["allowed"] = True |
63 | | - base["population"]["male"]["allowed"] = True |
64 | | - elif "Female" in row["Male/Female"]: |
65 | | - base["population"]["female"]["allowed"] = True |
66 | | - else: |
67 | | - base["population"]["male"]["allowed"] = True |
68 | | - |
69 | | - base["base_type"] = row["Type Detailed"] |
70 | | - base["avg_stay_length"] = row["FY25 ALOS"] |
71 | | - base["inspection_date"] = row["Last Inspection End Date"] |
72 | | - logger.debug("Updated facility: %s", base) |
73 | | - return base |
| 22 | + pass |
74 | 23 |
|
75 | 24 | def enrich_facility_data(self, facilities_data: dict) -> dict: |
76 | 25 | start_time = time.time() |
77 | 26 | logger.info("Starting data enrichment with external sources...") |
78 | 27 | enriched_data = copy.deepcopy(facilities_schema) |
79 | 28 | total = len(facilities_data["facilities"]) |
| 29 | + processed = 0 |
80 | 30 |
|
81 | | - for index, facility in enumerate(facilities_data["facilities"]): |
| 31 | + for facility_id, facility in enumerate(facilities_data["facilities"]): |
82 | 32 | facility_name = facility["name"] |
83 | | - logger.info("Processing facility %s/%s: %s...", index + 1, total, facility_name) |
| 33 | + logger.info("Processing facility %s/%s: %s...", processed + 1, total, facility_name) |
84 | 34 | enriched_facility = copy.deepcopy(facility) |
85 | | - addr = facility["address"] |
86 | | - full_address = ( |
87 | | - f"{addr['street']} {addr['locality']}, {addr['administrative_area']} {addr['postal_code']}".upper() |
88 | | - ) |
89 | | - if full_address in self.adp_sheet_data: |
90 | | - row = self.adp_sheet_data[full_address] |
91 | | - logger.debug("Found additional data in the ADP sheet for %s", facility_name) |
92 | | - enriched_facility = self._update_from_sheet(facility, row) |
93 | | - else: |
94 | | - logger.debug("Just making sure no other facilities match...") |
95 | | - for sheet_row in self.adp_sheet_data.values(): |
96 | | - if facility_name.upper() == sheet_row["Name"].upper(): |
97 | | - logger.debug("Matching facility for %s", facility_name) |
98 | | - enriched_facility = self._update_from_sheet(facility, sheet_row) |
99 | | - break |
100 | 35 |
|
101 | 36 | # Wikipedia search # todo refactor to method |
102 | 37 | try: |
@@ -131,7 +66,8 @@ def enrich_facility_data(self, facilities_data: dict) -> dict: |
131 | 66 | enriched_facility["osm_result_url"] = "" |
132 | 67 | enriched_facility["osm_search_query"] = str(e) |
133 | 68 |
|
134 | | - enriched_data["facilities"].append(enriched_facility) # type: ignore [attr-defined] |
| 69 | + enriched_data["facilities"][facility_id] = enriched_facility # type: ignore [index] |
| 70 | + processed += 1 |
135 | 71 |
|
136 | 72 | logger.info("Data enrichment completed!") |
137 | 73 | enriched_data["enrich_runtime"] = time.time() - start_time |
|
0 commit comments