-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathBulkDownloader.py
More file actions
990 lines (818 loc) · 31.3 KB
/
BulkDownloader.py
File metadata and controls
990 lines (818 loc) · 31.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
#!/usr/bin/python3
# BulkDownloader.py
# Copyright 2025 Theta Informatics LLC
# Apache 2.0 license
#
# Given a lat,lon coordinates that make a large bounding
# box, bulk download slightly-overlapping DTM tiles of particular type
# pass in upper-left, lower-right coordinates, DTM type, verbose flag
# read OPENATHENA_API_KEY from env, work with a OA Core RESTful API
# server to fetch the tiles
# --bounds 12.35 41.8 12.65 42 left bottom right top
# option for dry-run for printing out what script *would* do
# check API access to Core API server first to establish connectivity
# check admin api access to core api server to establish permission
# for calculating tiles, don't clip the edge tiles so that they all stay inside
# our bounding box. Instead, all tiles same size and they may spill over the bounding
# box slightly
# testing
# ft sill bounding box is: -98.756447 34.632163 -98.270302 34.72838
# HI island including some water: -158.293488 21.244838 -157.618927 21.705921
# Scotland -3.857574337162078 56.46782643095685 -2.645416022278369 56.97819241162202
# Poland 16.892578080296516 52.3296326113996 17.680114544928074 52.77053622518159
# Philmont -105.364563 36.244698 -104.689819 36.664288
# Taiwan 119.841064 21.805578 122.338623 25.443771
# S Taiwain 120.113159 21.901514 121.660034 23.897808
# N Italy 9.137694954872131 44.119141749106944 12.408691763877867 45.00313340559234
# right now with Core 0.5.1 no need for pacing or rate limiting as the
# cache directory indexing is inefficient and extremely slow
import math
import argparse
from dataclasses import dataclass
import json
import os
from urllib.parse import urlparse
from urllib.request import urlopen
from urllib.error import URLError, HTTPError
import sys
from typing import Callable
EARTH_RADIUS_KM = 6371.0
# Approximate length of 1 degree of latitude in km
KM_PER_DEG_LAT = 111.32
# ---- Defaults ----
DEFAULT_TILE_SIZE_M = 10_000.0 # 10 km tiles by default
# ---- ASCII box layout ----
LINE_WIDTH = 79 # assume ~80-char terminal
PLUS_LEFT_COL = 10
PLUS_RIGHT_COL = LINE_WIDTH - 10
OPENATHENA_API_KEY = "needApiKey"
VERBOSE=False
# LEFT = west-most longitude (lon_min)
# RIGHT = east-most longitude (lon_max)
# BOTTOM = south-most latitude (lat_min)
# TOP = north-most latitude (lat_max)
@dataclass
class Tile:
south: float
west: float
north: float
east: float
width_m: float
height_m: float
row: int
col: int
center_lat: float
center_lon: float
# ------------------------------------------------------------------------
# make an API call to given URL and then return result code
# ignoring any returned body for now
from urllib.request import urlopen, Request
from urllib.error import URLError, HTTPError
def get_url_status_code(
url: str,
method: str = "GET",
timeout: float = 15.0,
data: bytes | None = None,
headers: dict | None = None,
) -> int:
"""
Make a basic HTTP(S) request to `url` with the given method and
return a status code.
Args:
url: Full URL to call.
method: HTTP method, e.g. "GET", "POST", "PUT", "DELETE".
timeout: Timeout in seconds.
data: Optional request body as bytes (e.g., b'{}' for JSON).
headers: Optional dict of HTTP headers.
Returns:
- On success: the HTTP status code (e.g., 200, 301, 404, 500, ...).
- On HTTPError: the HTTP error code (4xx, 5xx).
- On network errors / DNS / timeout: -1
- On unexpected exceptions: -2
"""
if headers is None:
headers = {}
method = method.upper().strip()
# Build Request with explicit method (Python 3.3+ supports this)
req = Request(url, data=data, headers=headers, method=method)
try:
with urlopen(req, timeout=timeout) as resp:
return resp.getcode()
except HTTPError as e:
# Server responded but with an HTTP error (4xx/5xx)
return e.code
except URLError:
# DNS failure, refused connection, timeout, etc.
return -1
except Exception:
# Anything else unexpected
return -2
# ------------------------------------------------------------------------
# ------------------------------------------------------------------------
# make tiles with spillover; that is, if they exceed our bounding box just
# slightly, don't worry about it. Tiles are uniformly sized
# order of tile creation is: bottom,left upwards and rightwards
def make_tiles_with_spillover(lon_min: float,
lat_min: float,
lon_max: float,
lat_max: float,
tile_size_m: float,
overlap_m: float):
"""
Create full-size tiles that may extend beyond the bounding box.
All tiles have equal size (minus floating-point delta).
Args:
lon_min: left / west-most longitude (degrees).
lat_min: bottom / south-most latitude (degrees).
lon_max: right / east-most longitude (degrees).
lat_max: top / north-most latitude (degrees).
tile_size_m: nominal tile width/height in meters (square tiles).
overlap_m: overlap between neighboring tiles in meters.
Behavior:
- All tiles have the same angular size (derived from tile_size_m).
- Neighboring tiles' *starts* are separated by
(tile_size_m - overlap_m) in both directions.
- Tiles are NOT clipped to the bounds; outer tiles may extend beyond
the user-specified bbox.
- We generate enough rows/cols so that the grid fully covers the bbox.
"""
if overlap_m < 0:
raise ValueError("overlap_m must be >= 0")
if overlap_m >= tile_size_m:
raise ValueError("overlap_m must be < tile_size_m")
# Use mid-latitude of the bbox for lon->km conversion
center_lat = (lat_min + lat_max) / 2.0
km_per_deg_lon = km_per_deg_lon_at_lat(center_lat)
deg_per_km_lat = 1.0 / KM_PER_DEG_LAT
deg_per_km_lon = 1.0 / km_per_deg_lon if km_per_deg_lon != 0 else 0.0
# Convert sizes from meters to km
tile_size_km = tile_size_m / 1000.0
overlap_km = overlap_m / 1000.0
# Tile dimensions in degrees (fixed for all tiles)
tile_height_deg = tile_size_km * deg_per_km_lat
tile_width_deg = tile_size_km * deg_per_km_lon
# Step between tile starts in degrees (tile size minus overlap)
step_km = tile_size_km - overlap_km
step_lat_deg = step_km * deg_per_km_lat
step_lon_deg = step_km * deg_per_km_lon
if step_lat_deg <= 0 or step_lon_deg <= 0:
raise ValueError("Tile step must be positive (check overlap vs tile size).")
tiles = []
# How many rows/cols do we need to cover the bbox at least once?
delta_lat = lat_max - lat_min
delta_lon = lon_max - lon_min
# Small epsilon to avoid floating point edge weirdness
eps = 1e-12
n_rows = max(1, math.ceil((delta_lat - eps) / step_lat_deg))
n_cols = max(1, math.ceil((delta_lon - eps) / step_lon_deg))
for row in range(n_rows):
cur_south = lat_min + row * step_lat_deg
cur_north = cur_south + tile_height_deg
# Height is constant in degrees, so constant in km (approx)
height_km = tile_height_deg * KM_PER_DEG_LAT
height_m = height_km * 1000.0
for col in range(n_cols):
cur_west = lon_min + col * step_lon_deg
cur_east = cur_west + tile_width_deg
# Width is constant in degrees, so constant in km at center_lat
width_km = tile_width_deg * km_per_deg_lon
width_m = width_km * 1000.0
center_lat_tile = (cur_south + cur_north) / 2.0
center_lon_tile = (cur_west + cur_east) / 2.0
tiles.append(Tile(
south=cur_south,
west=cur_west,
north=cur_north,
east=cur_east,
width_m=width_m,
height_m=height_m,
row=row,
col=col,
center_lat=center_lat_tile,
center_lon=center_lon_tile,
))
# NOTE: we still return the original requested bbox,
# not the extended grid extent.
return tiles, (lat_min, lon_min, lat_max, lon_max)
# ------------------------------------------------------------------------
# calculate and make the tiles given the bounding box
# make tiles w/o spillover
# order of tile creation is: bottom,left upwards and rightwards
def make_tiles_without_spillover(
lon_min: float,
lat_min: float,
lon_max: float,
lat_max: float,
tile_size_m: float,
overlap_m: float):
"""
Create tiles that are clipped at the provided bounding box edges.
Tiles at the north/east edge may be smaller than tile_size_m
Args:
lon_min: left / west-most longitude (degrees).
lat_min: bottom / south-most latitude (degrees).
lon_max: right / east-most longitude (degrees).
lat_max: top / north-most latitude (degrees).
tile_size_m: nominal tile width/height in meters.
overlap_m: overlap between neighboring tiles in meters.
Tiles are nominally tile_size_m x tile_size_m in meters.
Neighboring tiles overlap by overlap_m in both directions
(so the *step* between tile starts is tile_size_m - overlap_m).
Edges may be smaller if the bounding box edge cuts a tile.
"""
if overlap_m < 0:
raise ValueError("overlap_m must be >= 0")
if overlap_m >= tile_size_m:
raise ValueError("overlap_m must be < tile_size_m")
# Use mid-latitude for lon->km conversion
center_lat = (lat_min + lat_max) / 2.0
km_per_deg_lon = km_per_deg_lon_at_lat(center_lat)
deg_per_km_lat = 1.0 / KM_PER_DEG_LAT
deg_per_km_lon = 1.0 / km_per_deg_lon if km_per_deg_lon != 0 else 0.0
# Convert sizes from meters to km
tile_size_km = tile_size_m / 1000.0
overlap_km = overlap_m / 1000.0
# Tile dimensions in degrees
tile_height_deg = tile_size_km * deg_per_km_lat
tile_width_deg = tile_size_km * deg_per_km_lon
# Step between tile starts in degrees (tile size minus overlap)
step_km = tile_size_km - overlap_km
step_lat_deg = step_km * deg_per_km_lat
step_lon_deg = step_km * deg_per_km_lon
tiles = []
row = 0
cur_south = lat_min
eps = 1e-12
while cur_south < lat_max - eps:
ideal_north = cur_south + tile_height_deg
tile_north = min(ideal_north, lat_max)
height_km = (tile_north - cur_south) * KM_PER_DEG_LAT
height_m = height_km * 1000.0
col = 0
cur_west = lon_min
while cur_west < lon_max - eps:
ideal_east = cur_west + tile_width_deg
tile_east = min(ideal_east, lon_max)
width_km = (tile_east - cur_west) * km_per_deg_lon
width_m = width_km * 1000.0
# calculate center
tile_center_lat = (cur_south + tile_north) / 2.0
tile_center_lon = (cur_west + tile_east) / 2.0
tiles.append(Tile(
south=cur_south,
west=cur_west,
north=tile_north,
east=tile_east,
width_m=width_m,
height_m=height_m,
row=row,
col=col,
center_lat=tile_center_lat,
center_lon=tile_center_lon,
))
cur_west += step_lon_deg
col += 1
cur_south += step_lat_deg
row += 1
# Keep return format the same: (tiles, (lat_min, lon_min, lat_max, lon_max))
return tiles, (lat_min, lon_min, lat_max, lon_max)
# ------------------------------------------------------------------------
# ------------------------------------------------------------------------
# region checkers that matches com.openathena.core.GeoRegionChecker.java class
USA_BOUNDING_BOXES = [
# West Coast (CA, OR, WA)
(32.0, -125.0, 49.0, -114.0),
# Southwest (AZ, NM, TX, OK, AR)
(25.0, -115.0, 37.0, -93.0),
# Intermountain West (UT, NV, ID, WY, MT)
(37.0, -115.0, 49.0, -109.0),
# Southwest (Texas, New Mexico, Arizona) – duplicate of above in your Java,
# included here for fidelity; harmless but slightly redundant.
(25.0, -115.0, 37.0, -93.0),
# Midwest
(36.0, -102.0, 49.0, -89.0),
# Great Plains / corrected Midwest including Colorado
(36.0, -109.0, 49.0, -89.0),
# Southeast (FL, GA, AL, MS, LA, eastern TX panhandle)
(24.396308, -89.0, 36.5, -75.0),
# Northeast
(36.5, -89.0, 49.0, -66.93457),
# Alaska (including the Aleutian Islands)
(51.2, -179.148909, 71.538800, -129.974167),
# Hawaii
(18.7763, -178.334698, 28.402123, -154.806773),
# Puerto Rico
(17.5, -67.5, 18.5, -65.0),
# Updated Pacific Northwest bounding box to include all of Washington State
# and coastal islands
(45.5, -125.0, 49.5, -116.5),
]
# Western Europe + related regions
EUROPE_BOUNDING_BOXES = [
# Western Europe (France, Benelux, Germany)
(41.0, -5.0, 51.5, 10.5),
# Southern Europe (Spain, Portugal, Italy, Greece)
(35.0, -10.0, 45.0, 20.0),
# Northern Europe (Scandinavia)
(55.0, 5.0, 71.0, 30.0),
# Eastern Europe (Poland, Czechia, Hungary, Romania)
(45.0, 10.5, 56.0, 30.0),
# Southeast Europe (Balkans)
(39.0, 13.0, 48.0, 30.0),
# Iceland
(63.0, -25.0, 67.0, -13.0),
# British Isles (United Kingdom, Ireland, and surrounding islands)
(49.9, -14.0, 61.0, 2.0),
# Greece including all major islands
(34.0, 19.0, 42.0, 30.0),
# Cyprus
(34.5, 32.0, 35.7, 34.0),
# Low Countries (Netherlands, Belgium, Luxembourg)
(49.4, 2.5, 53.7, 7.3),
# Baltic States (Estonia, Latvia, Lithuania)
(54.0, 20.0, 59.7, 28.2),
]
def is_in_usa(latitude: float, longitude: float) -> bool:
"""
Return True if (latitude, longitude) lies inside any of the USA bounding boxes
(continental US, Alaska, Hawaii, Puerto Rico, etc.).
"""
for lat_min, lon_min, lat_max, lon_max in USA_BOUNDING_BOXES:
if (lat_min <= latitude <= lat_max and
lon_min <= longitude <= lon_max):
return True
return False
def is_in_europe(latitude: float, longitude: float) -> bool:
"""
Return True if (latitude, longitude) lies inside any of the European
bounding boxes (Western Europe, Iceland, British Isles, Cyprus, Baltics, etc.).
"""
for lat_min, lon_min, lat_max, lon_max in EUROPE_BOUNDING_BOXES:
if (lat_min <= latitude <= lat_max and
lon_min <= longitude <= lon_max):
return True
return False
def bbox_all_in_usa(lat_min: float, lon_min: float,
lat_max: float, lon_max: float) -> bool:
return bbox_all_corners_in_region(
lat_min, lon_min, lat_max, lon_max, is_in_usa
)
def bbox_all_in_europe(lat_min: float, lon_min: float,
lat_max: float, lon_max: float) -> bool:
return bbox_all_corners_in_region(
lat_min, lon_min, lat_max, lon_max, is_in_europe
)
def bbox_all_corners_in_region(
lat_min: float,
lon_min: float,
lat_max: float,
lon_max: float,
region_check: Callable[[float, float], bool],
) -> bool:
"""
Return True if *all four corners* of the bounding box are inside
the given region_check(latitude, longitude) function.
"""
corners = [
(lat_min, lon_min), # SW
(lat_min, lon_max), # SE
(lat_max, lon_min), # NW
(lat_max, lon_max), # NE
]
return all(region_check(lat, lon) for lat, lon in corners)
# ------------------------------------------------------------------------
# ------------------------------------------------------------------------
# helper functions
def km_per_deg_lon_at_lat(lat_deg: float) -> float:
"""
Approximate kilometers per degree of longitude at a given latitude.
"""
lat_rad = math.radians(lat_deg)
return KM_PER_DEG_LAT * math.cos(lat_rad)
def parse_bool(value: str) -> bool:
value = value.lower().strip()
if value in ("true", "1", "yes", "y", "on"):
return True
if value in ("false", "0", "no", "n", "off"):
return False
raise argparse.ArgumentTypeError(
f"Invalid boolean value '{value}'. Expected true/false."
)
def fmt_coord(lat, lon):
return f"{lat:.6f},{lon:.6f}"
def place_label(line_chars, label, target_comma_col):
"""
Place `label` into `line_chars` so that the comma in the label
sits at column `target_comma_col` (as closely as possible).
"""
comma_idx = label.find(',')
if comma_idx == -1:
# Fallback: pretend comma is in the middle
comma_idx = len(label) // 2
start = target_comma_col - comma_idx
if start < 0:
start = 0
if start + len(label) > LINE_WIDTH:
start = max(0, LINE_WIDTH - len(label))
for i, ch in enumerate(label):
col = start + i
if 0 <= col < LINE_WIDTH:
line_chars[col] = ch
def print_ascii_box(lat_min, lon_min, lat_max, lon_max,
center_lat, center_lon):
"""
Print an ASCII box with corner '+' characters and put the
corner coordinates on lines above/below so that each '+'
is vertically aligned with the comma in the corresponding
'lat,lon' label.
Uses only space characters for padding (no tabs), and includes
a couple of interior lines so the box isn't too squished.
"""
nw = (lat_max, lon_min)
ne = (lat_max, lon_max)
sw = (lat_min, lon_min)
se = (lat_min, lon_max)
c = (center_lat, center_lon)
nw_label = fmt_coord(*nw)
ne_label = fmt_coord(*ne)
sw_label = fmt_coord(*sw)
se_label = fmt_coord(*se)
c_label = fmt_coord(*c)
# Top coordinate labels line (NW and NE)
top_label_line = [' '] * LINE_WIDTH
place_label(top_label_line, nw_label, PLUS_LEFT_COL)
place_label(top_label_line, ne_label, PLUS_RIGHT_COL)
print("".join(top_label_line).rstrip())
# Top border line
top_border = [' '] * LINE_WIDTH
top_border[PLUS_LEFT_COL] = '+'
for col in range(PLUS_LEFT_COL + 1, PLUS_RIGHT_COL):
top_border[col] = '-'
top_border[PLUS_RIGHT_COL] = '+'
print("".join(top_border).rstrip())
# Helper: blank interior line with vertical borders
def print_blank_line():
line = [' '] * LINE_WIDTH
line[PLUS_LEFT_COL] = '|'
line[PLUS_RIGHT_COL] = '|'
print("".join(line).rstrip())
# two blank interior line above center
print_blank_line()
print_blank_line()
# Middle line with center label roughly centered in the box
middle_line = [' '] * LINE_WIDTH
middle_line[PLUS_LEFT_COL] = '|'
middle_line[PLUS_RIGHT_COL] = '|'
c_start = (PLUS_LEFT_COL + PLUS_RIGHT_COL) // 2 - len(c_label) // 2
if c_start < PLUS_LEFT_COL + 1:
c_start = PLUS_LEFT_COL + 1
if c_start + len(c_label) >= PLUS_RIGHT_COL:
c_start = max(PLUS_LEFT_COL + 1, PLUS_RIGHT_COL - 1 - len(c_label))
for i, ch in enumerate(c_label):
col = c_start + i
if PLUS_LEFT_COL < col < PLUS_RIGHT_COL:
middle_line[col] = ch
print("".join(middle_line).rstrip())
# Two blank interior line below center
print_blank_line()
print_blank_line()
# Bottom border line
bottom_border = [' '] * LINE_WIDTH
bottom_border[PLUS_LEFT_COL] = '+'
for col in range(PLUS_LEFT_COL + 1, PLUS_RIGHT_COL):
bottom_border[col] = '-'
bottom_border[PLUS_RIGHT_COL] = '+'
print("".join(bottom_border).rstrip())
# Bottom coordinate labels line (SW and SE)
bottom_label_line = [' '] * LINE_WIDTH
place_label(bottom_label_line, sw_label, PLUS_LEFT_COL)
place_label(bottom_label_line, se_label, PLUS_RIGHT_COL)
print("".join(bottom_label_line).rstrip())
# ------------------------------------------------------------------------
# print summary of the tiles we will/would download
def print_summary(tiles, bbox, tile_size_m: float,
overlap_m: float,
dry_run: bool,
center_lat: float,
center_lon: float,
server_url: str | None,
dataset: str):
global verbose
lat_min, lon_min, lat_max, lon_max = bbox
num_tiles = len(tiles)
num_rows = max((t.row for t in tiles), default=-1) + 1
num_cols = max((t.col for t in tiles), default=-1) + 1
height_km = (lat_max - lat_min) * KM_PER_DEG_LAT
km_per_deg_lon = km_per_deg_lon_at_lat(center_lat)
width_km = (lon_max - lon_min) * km_per_deg_lon
area_km2 = width_km * height_km
print("Bounding box:")
print(
" lat_min, lon_min, lat_max, lon_max: "
f"{lat_min:.6f}, {lon_min:.6f}, {lat_max:.6f}, {lon_max:.6f}"
)
print(
"Box size: "
f"width {width_km:,.3f} km, "
f"height {height_km:,.3f} km"
)
print(f"Approx area: {area_km2:,.3f} square km")
print(f"Requested tile size: {tile_size_m:,.1f} m")
print(f"Requested overlap: {overlap_m:,.1f} m")
print(f"Dataset: {dataset}")
print(
f"Tiles (rows x cols): {num_rows:,} x {num_cols:,} "
f"(total {num_tiles:,})"
)
if tiles:
min_width = min(t.width_m for t in tiles)
max_width = max(t.width_m for t in tiles)
min_height = min(t.height_m for t in tiles)
max_height = max(t.height_m for t in tiles)
print(f"Tile width (m): min {min_width:,.1f}, max {max_width:,.1f}")
print(f"Tile height (m): min {min_height:,.1f}, max {max_height:,.1f}")
else:
print("No tiles generated (check your inputs).")
if server_url:
print(f"Server URL: {server_url}")
else:
print("Server URL: (none)")
if verbose:
print_ascii_box(lat_min, lon_min, lat_max, lon_max,
center_lat, center_lon)
# ------------------------------------------------------------------------
# write out a json file containing parameters and all the tiels
def write_json(tiles, bbox,
center_lat: float,
center_lon: float,
tile_size_m: float,
overlap_m: float,
server_url: str | None,
uniform: bool,
dataset: str,
filename: str = "bulkdownload.json"):
lat_min, lon_min, lat_max, lon_max = bbox
data = {
"center": {
"lat": center_lat,
"lon": center_lon,
},
"tile_size_m": tile_size_m,
"overlap_m": overlap_m,
"server_url": server_url,
"uniform": uniform,
"dataset": dataset,
"bbox": {
"lat_min": lat_min,
"lon_min": lon_min,
"lat_max": lat_max,
"lon_max": lon_max,
},
"tiles": [
{
"row": t.row,
"col": t.col,
"south": t.south,
"west": t.west,
"north": t.north,
"east": t.east,
"width_m": t.width_m,
"height_m": t.height_m,
"tile_center_lat": t.center_lat,
"tile_center_lon": t.center_lon,
}
for t in tiles
],
}
with open(filename, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
print(f"\nWrote {len(tiles)} tiles to JSON file: {filename}")
# ------------------------------------------------------------------------
def validate_server_url_or_exit(url: str, timeout: float = 15.0) -> str:
"""
Basic sanity check for the server URL *and* a live HTTP check.
- Strips whitespace.
- Requires scheme http or https.
- Requires a non-empty host.
- Normalizes by stripping a trailing slash.
- Issues a GET request to the URL with a short timeout using urllib.
* Accepts 2xx and 3xx status codes as "OK".
- Exits the program with an error message if invalid or unreachable.
Returns the normalized URL string.
"""
if url is None:
print("ERROR: server URL is missing.", file=sys.stderr)
sys.exit(1)
url = url.strip()
if not url:
print("ERROR: server URL is empty.", file=sys.stderr)
sys.exit(1)
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
print(
"ERROR: server URL must start with http:// or https:// "
f"(got scheme '{parsed.scheme or 'None'}').",
file=sys.stderr,
)
sys.exit(1)
if not parsed.netloc:
print(
"ERROR: server URL is missing a host "
"(expected something like https://example.com or "
"https://example.com/api).",
file=sys.stderr,
)
sys.exit(1)
# Normalize: strip trailing slash so later joins are easier
normalized = url.rstrip("/")
urlstr = normalized + "/api/v1/openathena/up?" + OPENATHENA_API_KEY
status = get_url_status_code(urlstr,method="GET", timeout=timeout)
if status == -1:
print(
f"ERROR: failed to contact server URL '{normalized}' "
"(network/DNS/timeout error).",
file=sys.stderr,
)
sys.exit(1)
elif status == -2:
print(
f"ERROR: unexpected error contacting server URL '{normalized}'.",
file=sys.stderr,
)
sys.exit(1)
elif not (200 <= status < 400):
print(
"ERROR: server URL responded with unexpected status code "
f"{status} (expected 2xx or 3xx).",
file=sys.stderr,
)
sys.exit(1)
if verbose:
print(f"{normalized} is up")
# now test to make sure we have admin permissions?
# don't need to; regular uers can perform POST to request a new Dem be downloaded
return normalized
# ------------------------------------------------------------------------
# ------------------------------------------------------------------------
def main():
global OPENATHENA_API_KEY
global verbose
parser = argparse.ArgumentParser(
description="Generate a grid of tiles over a square bounding box "
"around a center latitude/longitude."
)
parser.add_argument(
"-v", "--verbose",action="store_true",
help="Enable verbose output."
)
parser.add_argument(
"--bounds",
type=float,
nargs=4,
metavar=("LEFT", "BOTTOM", "RIGHT", "TOP"),
required=True,
help=(
"Bounding box as LEFT BOTTOM RIGHT TOP "
"(lon_min lat_min lon_max lat_max). "
"LEFT = west-most lon, RIGHT = east-most lon, "
"BOTTOM = south-most lat, TOP = north-most lat."
),
)
parser.add_argument(
"--tile-size-m",
type=float,
default=DEFAULT_TILE_SIZE_M,
help=f"Edge length of each tile in METERS (default: {DEFAULT_TILE_SIZE_M:.0f})."
)
parser.add_argument(
"--overlap-m",
type=float,
default=0.0,
help="Overlap between neighboring tiles in METERS (default: 0)."
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print only a summary of the grid (no per-tile listing)."
)
parser.add_argument(
"--json-output",
action="store_true",
help="Also write tile data to bulkdownload.json."
)
parser.add_argument(
"--server-url",
type=str,
required=True,
help="Base URL for OpenAthenaCore API server."
)
parser.add_argument(
"--uniform",
type=parse_bool,
default=True,
help="Whether to use uniform-size tiles that spill past the bounding box "
"(true) or clipped tiles (false).",
)
parser.add_argument(
"--dataset",
type=str.upper,
choices=["SRTM","3DEP","COP30","EUDTM", "AUTO"],
required=True,
help="Elevation dataset to download: SRTM, 3DEP, COP30, EUDTM, or automatic. "
"Choose auto to let this script determine best dataset.",
)
args = parser.parse_args()
if args.verbose:
verbose = True
else:
verbose = False
left, bottom, right, top = args.bounds
lon_min, lat_min, lon_max, lat_max = left, bottom, right, top
center_lat = (lat_min + lat_max) / 2.0
center_lon = (lon_min + lon_max) / 2.0
if args.uniform:
tiles, bbox = make_tiles_with_spillover(
lon_min=lon_min,
lat_min=lat_min,
lon_max=lon_max,
lat_max=lat_max,
tile_size_m=args.tile_size_m,
overlap_m=args.overlap_m,
)
else:
tiles, bbox = make_tiles_without_spillover(
lon_min=lon_min,
lat_min=lat_min,
lon_max=lon_max,
lat_max=lat_max,
tile_size_m=args.tile_size_m,
overlap_m=args.overlap_m,
)
# Always print summary
print_summary(
tiles,
bbox,
tile_size_m=args.tile_size_m,
overlap_m=args.overlap_m,
dry_run=args.dry_run,
center_lat=center_lat,
center_lon=center_lon,
server_url=args.server_url,
dataset=args.dataset,
)
# JSON output if requested
if args.json_output:
write_json(
tiles,
bbox,
center_lat=center_lat,
center_lon=center_lon,
tile_size_m=args.tile_size_m,
overlap_m=args.overlap_m,
server_url=args.server_url,
uniform=args.uniform,
dataset=args.dataset,
filename="bulkdownload.json",
)
# pull OA API key from environment variable
if "OPENATHENA_API_KEY" in os.environ:
OPENATHENA_API_KEY=os.getenv("OPENATHENA_API_KEY")
if verbose:
print(f"Using {OPENATHENA_API_KEY} to communicate with {args.server_url}")
args.server_url = validate_server_url_or_exit(args.server_url)
# determine dataset; if automatic, check if region is in USA or Europe
# and set to 3dep or eudtm otherwise use cop30
# if user specifically set one, try that w/o checking
all_usa = bbox_all_in_usa(lat_min, lon_min, lat_max, lon_max)
all_eu = bbox_all_in_europe(lat_min, lon_min, lat_max, lon_max)
if args.dataset == "AUTO":
args.dataset="cop30"
if all_usa:
args.dataset="3dep"
if all_eu:
args.dataset="eudtm"
# now, loop through the tiles and print out (if verbose)
# tile info; if dry-run, show what we would post
# if not dry-run, do the actual post
for tile in tiles:
urlstr = f"{args.server_url}" + f"/api/v1/openathena/dem?lat={tile.center_lat:.6f}&lon={tile.center_lon:.6f}&len={tile.width_m:.0f}&dataset={args.dataset}&apikey=" + OPENATHENA_API_KEY
if verbose:
print(
f"Tile: r{tile.row} c{tile.col}: "
f"lat: [{tile.south:.6f}, {tile.north:.6f}], "
f"lon: [{tile.west:.6f}, {tile.east:.6f}], "
f"center: ({tile.center_lat:.6f},{tile.center_lon:.6f}), "
f"size: ≈ {tile.width_m:.1f} m x {tile.height_m:.1f} m"
)
print(f"url: {urlstr}")
if args.dry_run == False:
if verbose:
print(f" Going to invoke API to download tile {tile.row}x{tile.col}")
print(f" url: {urlstr}")
status = get_url_status_code(urlstr,method="POST",timeout=25)
if verbose:
print(f" POST returned {status}")
# ------------------------------------------------------------------------
# ------------------------------------------------------------------------
if __name__ == "__main__":
main()