-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathusps_fun.py
More file actions
332 lines (295 loc) · 12.8 KB
/
usps_fun.py
File metadata and controls
332 lines (295 loc) · 12.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
import os, json, time, logging
from typing import List, Dict, Optional, Tuple, Union
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
AGENT = "USPS-OSM-StateDownloader/1.1 (contact: you@example.com)"
OVERPASS_ENDPOINTS = [
"https://overpass-api.de/api/interpreter",
"https://overpass.kumi.systems/api/interpreter",
"https://overpass.openstreetmap.fr/api/interpreter",
]
STATES = [
("AL","Alabama"), ("AK","Alaska"), ("AZ","Arizona"), ("AR","Arkansas"),
("CA","California"), ("CO","Colorado"), ("CT","Connecticut"), ("DE","Delaware"),
("DC","District of Columbia"),
("FL","Florida"), ("GA","Georgia"), ("HI","Hawaii"), ("ID","Idaho"),
("IL","Illinois"), ("IN","Indiana"), ("IA","Iowa"), ("KS","Kansas"),
("KY","Kentucky"), ("LA","Louisiana"), ("ME","Maine"), ("MD","Maryland"),
("MA","Massachusetts"), ("MI","Michigan"), ("MN","Minnesota"), ("MS","Mississippi"),
("MO","Missouri"), ("MT","Montana"), ("NE","Nebraska"), ("NV","Nevada"),
("NH","New Hampshire"), ("NJ","New Jersey"), ("NM","New Mexico"), ("NY","New York"),
("NC","North Carolina"), ("ND","North Dakota"), ("OH","Ohio"), ("OK","Oklahoma"),
("OR","Oregon"), ("PA","Pennsylvania"), ("RI","Rhode Island"), ("SC","South Carolina"),
("SD","South Dakota"), ("TN","Tennessee"), ("TX","Texas"), ("UT","Utah"),
("VT","Vermont"), ("VA","Virginia"), ("WA","Washington"), ("WV","West Virginia"),
("WI","Wisconsin"), ("WY","Wyoming")
]
TERRITORIES = [
("PR","Puerto Rico"), ("GU","Guam"), ("VI","United States Virgin Islands"),
("AS","American Samoa"), ("MP","Northern Mariana Islands")
]
# there has to be a better way
ISO2_BY_ABBR = {
"AL":"US-AL","AK":"US-AK","AZ":"US-AZ","AR":"US-AR","CA":"US-CA","CO":"US-CO","CT":"US-CT","DE":"US-DE",
"DC":"US-DC","FL":"US-FL","GA":"US-GA","HI":"US-HI","ID":"US-ID","IL":"US-IL","IN":"US-IN","IA":"US-IA",
"KS":"US-KS","KY":"US-KY","LA":"US-LA","ME":"US-ME","MD":"US-MD","MA":"US-MA","MI":"US-MI","MN":"US-MN",
"MS":"US-MS","MO":"US-MO","MT":"US-MT","NE":"US-NE","NV":"US-NV","NH":"US-NH","NJ":"US-NJ","NM":"US-NM",
"NY":"US-NY","NC":"US-NC","ND":"US-ND","OH":"US-OH","OK":"US-OK","OR":"US-OR","PA":"US-PA","RI":"US-RI",
"SC":"US-SC","SD":"US-SD","TN":"US-TN","TX":"US-TX","UT":"US-UT","VT":"US-VT","VA":"US-VA","WA":"US-WA",
"WV":"US-WV","WI":"US-WI","WY":"US-WY",
"PR":"US-PR","GU":"US-GU","VI":"US-VI","AS":"US-AS","MP":"US-MP"
}
# ---------------------------------------------------
def _setup_logging(verbose: bool):
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s", level=level)
def _build_overpass_query_iso(iso_code: str, strict_usps: bool, timeout: int):
if strict_usps:
filt = '''
node["amenity"="post_office"]["operator"~"United States Postal Service|USPS"](area.a);
way["amenity"="post_office"]["operator"~"United States Postal Service|USPS"](area.a);
relation["amenity"="post_office"]["operator"~"United States Postal Service|USPS"](area.a);
node["amenity"="post_office"]["brand"~"United States Postal Service|USPS"](area.a);
way["amenity"="post_office"]["brand"~"United States Postal Service|USPS"](area.a);
relation["amenity"="post_office"]["brand"~"United States Postal Service|USPS"](area.a);
'''
else:
filt = '''
node["amenity"="post_office"](area.a);
way["amenity"="post_office"](area.a);
relation["amenity"="post_office"](area.a);
'''
return f"""
[out:json][timeout:{timeout}];
rel["boundary"="administrative"]["ISO3166-2"="{iso_code}"];
map_to_area->.a;
(
{filt}
);
out center tags;
"""
def _overpass_request(query: str, endpoints: List[str], max_retries: int = 6):
headers = {"User-Agent": AGENT, "Accept-Encoding": "gzip, deflate"}
delay = 2.0
last_err = None
for _ in range(max_retries):
for url in endpoints:
try:
r = requests.post(url, data={"data": query}, headers=headers, timeout=120)
if r.status_code == 200:
return r.json()
if r.status_code in (429, 500, 502, 503, 504):
last_err = RuntimeError(f"{url} {r.status_code}: {r.text[:200]}")
time.sleep(delay); continue
r.raise_for_status()
except requests.RequestException as e:
last_err = e
time.sleep(delay)
delay = min(30.0, delay * 1.7)
raise RuntimeError(f"Overpass request failed after retries. Last error: {last_err}")
def _elements_to_df(elements: List[dict]):
rows = []
for el in elements:
et = el.get("type"); eid = el.get("id")
tags = el.get("tags", {}) or {}
if et == "node":
lat, lon = el.get("lat"), el.get("lon")
else:
c = el.get("center") or {}; lat, lon = c.get("lat"), c.get("lon")
if lat is None or lon is None:
continue
rows.append({
"osm_id": f"{et}/{eid}",
"osm_type": et,
"osm_numeric_id": eid,
"name": tags.get("name"),
"operator": tags.get("operator"),
"brand": tags.get("brand"),
"ref_usps": tags.get("ref:usps") or tags.get("usps:id"),
"addr_housenumber": tags.get("addr:housenumber"),
"addr_street": tags.get("addr:street"),
"addr_unit": tags.get("addr:unit"),
"addr_city": tags.get("addr:city"),
"addr_state": tags.get("addr:state"),
"addr_postcode": tags.get("addr:postcode"),
"phone": tags.get("phone"),
"website": tags.get("website"),
"opening_hours": tags.get("opening_hours"),
"source_tag": tags.get("source"),
"lon": float(lon), "lat": float(lat),
"all_tags_json": json.dumps(tags, ensure_ascii=False)
})
df = pd.DataFrame(rows)
if not df.empty:
df = df.drop_duplicates(subset=["osm_id"]).reset_index(drop=True)
return df
def _write_gpkg_points(df: pd.DataFrame, out_path: str, layer: str = "points"):
if df is None or df.empty:
return None
gdf = gpd.GeoDataFrame(df, geometry=[Point(xy) for xy in zip(df["lon"], df["lat"])], crs="EPSG:4326")
if os.path.exists(out_path):
os.remove(out_path)
gdf.to_file(out_path, layer=layer, driver="GPKG")
return gdf
def _run_one_state(abbr: str,
out_dir: str,
strict_usps: bool,
timeout: int,
endpoints: List[str],
skip_existing: bool,
verbose: bool,
return_gdfs: bool):
gpkg = os.path.join(out_dir, f"usps_{abbr}.gpkg")
if skip_existing and os.path.exists(gpkg):
if verbose: logging.info(f"[{abbr}] exists, skipping: {gpkg}")
return {"abbr": abbr, "path": gpkg, "gdf": gpd.read_file(gpkg, layer="points") if return_gdfs else None}
iso = ISO2_BY_ABBR.get(abbr.upper())
if not iso:
logging.error(f"[{abbr}] missing ISO3166-2 map entry.")
return {"abbr": abbr, "path": None, "gdf": None}
q = _build_overpass_query_iso(iso, strict_usps, timeout)
try:
data = _overpass_request(q, endpoints)
except Exception as e:
logging.error(f"[{abbr}] Overpass error: {e}")
return {"abbr": abbr, "path": None, "gdf": None}
df = _elements_to_df(data.get("elements", []))
if df.empty:
logging.warning(f"[{abbr}] No features returned.")
return {"abbr": abbr, "path": None, "gdf": None}
gdf = _write_gpkg_points(df, gpkg, layer="points")
if verbose and gdf is not None:
logging.info(f"[{abbr}] wrote {len(gdf):,} points {gpkg}")
return {"abbr": abbr, "path": gpkg, "gdf": gdf if return_gdfs else None}
def pull_usps(
states: Union[str, List[str]] = "ALL",
out_dir: str = "./usps_out",
strict_usps: bool = False,
include_territories: bool = False,
merge: bool = False,
merge_path: str = "./usps_points_us.gpkg",
timeout: int = 60,
skip_existing: bool = False,
overpass_endpoints: Optional[List[str]] = None,
workers: int = 1,
verbose: bool = False,
return_gdfs: bool = False):
"""
Fetch OSM post offices state-by-state and write GPKGs; optionally return GeoDataFrames.
Returns dict:
{
"state_paths": List[str], # written per-state GPKG paths
"merged_path": Optional[str], # merged GPKG path if merge=True
"per_state_gdfs": Optional[dict], # { 'WA': GeoDataFrame, ... } if return_gdfs=True
"merged_gdf": Optional[GeoDataFrame] # merged GeoDataFrame if merge=True & return_gdfs=True
}
"""
_setup_logging(verbose)
os.makedirs(out_dir, exist_ok=True)
endpoints = overpass_endpoints or OVERPASS_ENDPOINTS
# Build target list
target = STATES.copy()
if include_territories:
target += TERRITORIES
if isinstance(states, str) and states != "ALL":
keep = {s.strip().upper() for s in states.split(",") if s.strip()}
target = [(a, n) for (a, n) in target if a in keep]
elif isinstance(states, list):
keep = {s.strip().upper() for s in states if isinstance(s, str)}
target = [(a, n) for (a, n) in target if a in keep]
if not target:
raise ValueError("No matching states to process.")
# Process (optionally in parallel)
workers = max(1, min(int(workers), 12))
if verbose:
logging.info(f"Fetching {len(target)} states with workers={workers} (strict_usps={strict_usps})")
results = []
if workers == 1:
for abbr, _ in target:
results.append(
_run_one_state(abbr, out_dir, strict_usps, timeout, endpoints, skip_existing, verbose, return_gdfs)
)
time.sleep(0.3)
else:
with ThreadPoolExecutor(max_workers=workers) as ex:
futs = {
ex.submit(
_run_one_state, abbr, out_dir, strict_usps, timeout, endpoints, skip_existing, verbose, return_gdfs
): abbr for abbr, _ in target
}
for fut in as_completed(futs):
results.append(fut.result())
time.sleep(0.5)
# Collect outputs
state_paths = [r["path"] for r in results if r["path"]]
per_state_gdfs = {r["abbr"]: r["gdf"] for r in results if r["gdf"] is not None} if return_gdfs else None
merged_path = None
merged_gdf = None
if merge and state_paths:
if return_gdfs and per_state_gdfs:
# Merge in-memory if we already have GDFs
frames = [per_state_gdfs[a] for a in per_state_gdfs if per_state_gdfs[a] is not None and not per_state_gdfs[a].empty]
if frames:
merged = pd.concat(frames, ignore_index=True)
merged_gdf = gpd.GeoDataFrame(merged, geometry="geometry", crs="EPSG:4326")
if os.path.exists(merge_path):
os.remove(merge_path)
merged_gdf.to_file(merge_path, layer="usps_points_us", driver="GPKG")
merged_path = merge_path
else:
# Merge from disk
frames = []
for p in state_paths:
try:
g = gpd.read_file(p, layer="points")
if not g.empty:
frames.append(g)
except Exception as e:
logging.warning(f"Skip merge {p}: {e}")
if frames:
merged = pd.concat(frames, ignore_index=True)
merged_gdf = gpd.GeoDataFrame(merged, geometry="geometry", crs="EPSG:4326")
if os.path.exists(merge_path):
os.remove(merge_path)
merged_gdf.to_file(merge_path, layer="usps_points_us", driver="GPKG")
merged_path = merge_path
return {
"state_paths": state_paths,
"merged_path": merged_path,
"per_state_gdfs": per_state_gdfs,
"merged_gdf": merged_gdf
}
#--------------------
# Usage
#--------------------
out = pull_usps(
states="WA,OR,CA",
out_dir="./usps_out",
merge=True,
merge_path= "./usps_points_us.gpkg",
include_territories=False,
strict_usps=False,
skip_existing=False,
return_gdfs=True,
verbose=True,
)
out["state_paths"] # list of per-state GPKG paths
out["merged_path"] # merged GPKG path (if merge=True)
out["merged_gdf"] # GeoDataFrame of all points (if return_gdfs=True & merge=True)
out["per_state_gdfs"]['CA'] # dict of GeoDataFrames keyed by state abbr (if return_gdfs=True)
out["merged_gdf"].plot()
out_all = pull_usps(
# states="WA,OR,CA",
out_dir="./usps_out_all",
merge=True,
merge_path= "./usps_points_us_all.gpkg",
include_territories=False,
strict_usps=False,
skip_existing=False,
return_gdfs=True,
verbose=True,
workers = 5,
)