|
| 1 | +import argparse |
| 2 | +import math |
| 3 | +import os |
| 4 | +import typing as t |
| 5 | +from pathlib import Path |
| 6 | + |
| 7 | +import httpx |
| 8 | +from dotenv import load_dotenv |
| 9 | + |
| 10 | +load_dotenv() |
| 11 | +AIO_USER = os.environ.get("AIO_USER") |
| 12 | +if AIO_USER is None: |
| 13 | + raise RuntimeError("Could not locate 'AIO_USER' env var") |
| 14 | +AIO_KEY = os.environ.get("AIO_KEY") |
| 15 | +if AIO_KEY is None: |
| 16 | + raise RuntimeError("Could not locate 'AIO_KEY' env var") |
| 17 | +GEOAPIFY_KEY = os.environ.get("GEOAPIFY_KEY") |
| 18 | +if GEOAPIFY_KEY is None: |
| 19 | + raise RuntimeError("Could not locate 'GEOAPIFY_KEY' env var") |
| 20 | + |
| 21 | + |
| 22 | +SCREEN_RES = { |
| 23 | + "pyportal": (320, 240), |
| 24 | + "feather": (480, 320), |
| 25 | +} |
| 26 | + |
| 27 | +GEOAPIFY_API_URL_BASE = "https://maps.geoapify.com/v1/staticmap" |
| 28 | +AIO_URL_BASE = f"https://io.adafruit.com/api/v2/{AIO_USER}/integrations/image-formatter" |
| 29 | + |
| 30 | +DEFAULT_CENTER_LAT = 42.41 |
| 31 | +DEFAULT_CENTER_LON = -71.17 |
| 32 | +DEFAULT_GRID_WIDTH_MI = 15 |
| 33 | +MAP_STYLE = "klokantech-basic" |
| 34 | + |
| 35 | + |
| 36 | +# We have to copy over our helpers since we can't import from Skyportal without having the |
| 37 | +# CircuitPython modules installed |
| 38 | +def build_url(base: str, params: dict[str, t.Any]) -> str: |
| 39 | + """Build a url from the provided base & parameter(s).""" |
| 40 | + param_str = "&".join(f"{k}={v}" for k, v in params.items()) |
| 41 | + return f"{base}?{param_str}" |
| 42 | + |
| 43 | + |
| 44 | +def urlencode(url: str) -> str: |
| 45 | + """Encode any non-alphanumeric, non-digit, or chars that aren't `-` or `.` in the given URL.""" |
| 46 | + encoded_chars = [] |
| 47 | + for c in url: |
| 48 | + if any((c.isalpha(), c.isdigit(), (c in ("-", ".")))): |
| 49 | + encoded_chars.append(c) |
| 50 | + else: |
| 51 | + encoded_chars.append(f"%{ord(c):02X}") |
| 52 | + |
| 53 | + return "".join(encoded_chars) |
| 54 | + |
| 55 | + |
| 56 | +def build_bounding_box( |
| 57 | + screen_width: int, |
| 58 | + screen_height: int, |
| 59 | + map_center_lat: float, |
| 60 | + map_center_lon: float, |
| 61 | + grid_width_mi: int, |
| 62 | +) -> tuple[float, float, float, float]: |
| 63 | + """Calculate the bounding corners of a rectangular grid centered at the specified map center.""" |
| 64 | + earth_radius_km = 6378.1 |
| 65 | + |
| 66 | + center_lat_rad = math.radians(map_center_lat) |
| 67 | + center_lon_rad = math.radians(map_center_lon) |
| 68 | + grid_size_km = grid_width_mi * 1.6 |
| 69 | + |
| 70 | + # Calculate distance deltas |
| 71 | + ang_dist = grid_size_km / earth_radius_km |
| 72 | + d_lat = ang_dist |
| 73 | + d_lon = math.asin(math.sin(ang_dist) / math.cos(center_lat_rad)) |
| 74 | + |
| 75 | + # Scale rectangle height from the specified width |
| 76 | + aspect_ratio = screen_width / screen_height |
| 77 | + d_lon *= aspect_ratio |
| 78 | + |
| 79 | + # Calculate latitude bounds |
| 80 | + min_center_lat_rad = center_lat_rad - d_lat |
| 81 | + max_center_lat_rad = center_lat_rad + d_lat |
| 82 | + |
| 83 | + # Calculate longitude bounds |
| 84 | + min_center_lon_rad = center_lon_rad - d_lon |
| 85 | + max_center_lon_rad = center_lon_rad + d_lon |
| 86 | + |
| 87 | + # Convert from radians to degrees |
| 88 | + lat_min = math.degrees(min_center_lat_rad) |
| 89 | + lat_max = math.degrees(max_center_lat_rad) |
| 90 | + lon_min = math.degrees(min_center_lon_rad) |
| 91 | + lon_max = math.degrees(max_center_lon_rad) |
| 92 | + |
| 93 | + return lat_min, lat_max, lon_min, lon_max |
| 94 | + |
| 95 | + |
| 96 | +def get_base_map( |
| 97 | + screen_width: int, |
| 98 | + screen_height: int, |
| 99 | + center_lat: float, |
| 100 | + center_lon: float, |
| 101 | + grid_bounds: tuple[float, float, float, float], |
| 102 | +) -> None: |
| 103 | + """ |
| 104 | + Query Geoapify for the map image with the given parameters. |
| 105 | +
|
| 106 | + If successful, the map tile image is saved to `./tmp/generated_map.bmp`. |
| 107 | +
|
| 108 | + An attempt is made to query the Geoapify API for a map tile at the location specified by the |
| 109 | + specified mapping constants. |
| 110 | +
|
| 111 | + Since Geoapify returns a PNG & CircuitPython needs a BMP, this image is sent to Adafruit IO for |
| 112 | + resizing & conversion. |
| 113 | +
|
| 114 | + NOTE: The request sent to Geoapify specifies an image size 2x that of the display in order to |
| 115 | + shrink down the labels on the final image for better visualization. |
| 116 | +
|
| 117 | + NOTE: AIO image formatter queries are rate limited to 1 per minute. |
| 118 | + """ |
| 119 | + lat_min, lat_max, lon_min, lon_max = grid_bounds |
| 120 | + map_params = { |
| 121 | + "apiKey": GEOAPIFY_KEY, |
| 122 | + "style": MAP_STYLE, |
| 123 | + "format": "png", |
| 124 | + "center": f"lonlat:{center_lon},{center_lat}", |
| 125 | + "area": f"rect:{lon_min},{lat_min},{lon_max},{lat_max}", |
| 126 | + "width": screen_width * 2, |
| 127 | + "height": screen_height * 2, |
| 128 | + } |
| 129 | + map_query_url = build_url(GEOAPIFY_API_URL_BASE, map_params) |
| 130 | + |
| 131 | + adaIO_params = { |
| 132 | + "x-aio-key": AIO_KEY, |
| 133 | + "width": screen_width, |
| 134 | + "height": screen_height, |
| 135 | + "output": "BMP16", |
| 136 | + "url": urlencode(map_query_url), # Encode so AIO doesn't eat the Geoapify params |
| 137 | + } |
| 138 | + adaIO_query_url = build_url(AIO_URL_BASE, adaIO_params) |
| 139 | + |
| 140 | + target_dir = Path("./tmp") |
| 141 | + target_dir.mkdir(exist_ok=True) |
| 142 | + dest = target_dir / "generated_map.bmp" |
| 143 | + |
| 144 | + with httpx.Client() as client: |
| 145 | + print("Sending image request to AIO...") |
| 146 | + r = client.get(adaIO_query_url) |
| 147 | + |
| 148 | + if r.status_code != httpx.codes.OK: |
| 149 | + raise RuntimeError(f"Bad response received from AIO: {r.status_code}, {r.text}") |
| 150 | + |
| 151 | + with dest.open("wb") as f: |
| 152 | + for c in r.iter_bytes(chunk_size=2048): |
| 153 | + f.write(c) |
| 154 | + |
| 155 | + print("Map image successfully written to '{dest}'!") |
| 156 | + |
| 157 | + |
| 158 | +def main() -> None: # noqa: D103 |
| 159 | + description = "Query Geoapify & AIO for a map tile with the given parameters." |
| 160 | + epilog = "NOTE: AIO image formatter queries are rate limited to 1 per minute." |
| 161 | + parser = argparse.ArgumentParser(description=description, epilog=epilog) |
| 162 | + parser.add_argument("target", type=str, choices=("pyportal", "feather")) |
| 163 | + parser.add_argument("--center_lat", type=float, default=DEFAULT_CENTER_LAT) |
| 164 | + parser.add_argument("--center_lon", type=float, default=DEFAULT_CENTER_LON) |
| 165 | + parser.add_argument("--grid_width", type=int, default=DEFAULT_GRID_WIDTH_MI) |
| 166 | + args = parser.parse_args() |
| 167 | + |
| 168 | + width, height = SCREEN_RES[args.target] |
| 169 | + map_bbox = build_bounding_box( |
| 170 | + screen_width=width, |
| 171 | + screen_height=height, |
| 172 | + map_center_lat=args.center_lat, |
| 173 | + map_center_lon=args.center_lon, |
| 174 | + grid_width_mi=args.grid_width, |
| 175 | + ) |
| 176 | + |
| 177 | + get_base_map( |
| 178 | + screen_width=width, |
| 179 | + screen_height=height, |
| 180 | + center_lat=args.center_lat, |
| 181 | + center_lon=args.center_lon, |
| 182 | + grid_bounds=map_bbox, |
| 183 | + ) |
| 184 | + |
| 185 | + |
| 186 | +if __name__ == "__main__": |
| 187 | + main() |
0 commit comments