|
16 | 16 | """ |
17 | 17 |
|
18 | 18 | import os |
| 19 | +from pathlib import Path |
19 | 20 | from datetime import date, timedelta |
20 | 21 | from itertools import chain |
21 | 22 |
|
|
35 | 36 | # 'email': [email_main], |
36 | 37 | "email_on_failure": True, |
37 | 38 | "email_on_retry": False, |
38 | | - "retries": 2, |
39 | | - "retry_delay": timedelta(minutes=2), |
| 39 | + "retries": 5, |
| 40 | + "retry_delay": timedelta(seconds=30), |
40 | 41 | } |
41 | 42 |
|
42 | 43 |
|
|
47 | 48 | schedule="@monthly", |
48 | 49 | default_args=DEFAULT_ARGS, |
49 | 50 | start_date=pendulum.datetime(2000, 1, 1), |
50 | | - end_date=pendulum.datetime(2024, 1, 1), |
51 | 51 | catchup=True, |
52 | 52 | max_active_runs=14, |
53 | 53 | ) as dag: |
@@ -75,14 +75,20 @@ def fetch_ds(locale, dt, uri, api_key): |
75 | 75 | print("TABLE_GEO ", f"[{len(table_geocodes)}]: ", table_geocodes) |
76 | 76 | print("DIFF_GEO: ", f"[{len(geocodes)}]: ", geocodes) |
77 | 77 |
|
| 78 | + basename = str(dt).replace("-", "_") + locale |
78 | 79 | with request.reanalysis_era5_land( |
79 | | - str(dt).replace("-", "_") + locale, |
| 80 | + basename, |
80 | 81 | api_token=api_key, |
81 | 82 | date=str(dt), |
82 | 83 | locale=locale, |
83 | 84 | ) as ds: |
84 | | - for adm in ADM2.filter(adm0=locale): |
| 85 | + for geocode in geocodes: |
| 86 | + adm = ADM2.get(code=geocode): |
85 | 87 | with engine.connect() as conn: |
86 | 88 | ds.cope.to_sql(adm, conn, tablename, "weather") |
| 89 | + file = Path(f"{basename}.zip") |
| 90 | + if file.exists(): |
| 91 | + file.unlink() |
| 92 | + print(f"{file} removed") |
87 | 93 |
|
88 | 94 | fetch_ds("BRA", DATE, URI["PSQL_MAIN_URI"], KEY["CDSAPI_KEY"]) |
0 commit comments