Skip to content

Commit 1154e5a

Browse files
committed
include daily fetch with backfill for each adm 2
1 parent 1fc7868 commit 1154e5a

File tree

3 files changed

+99
-104
lines changed

3 files changed

+99
-104
lines changed

alertflow/dags/satellite-weather/argentina.py

Lines changed: 76 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -15,79 +15,79 @@
1515
around 7 days to update the dataset.
1616
"""
1717

18-
import os
19-
import logging
20-
import calendar
21-
from datetime import timedelta, date
22-
23-
import pendulum
24-
from airflow import DAG
25-
from airflow.decorators import task
26-
from airflow.models import Variable
27-
from sqlalchemy import create_engine, text
28-
29-
from satellite import request, ADM2
30-
31-
env = os.getenv
32-
email_main = env("EMAIL_MAIN")
33-
34-
DEFAULT_ARGS = {
35-
"owner": "AlertaDengue",
36-
"depends_on_past": False,
37-
# 'email': [email_main],
38-
"email_on_failure": True,
39-
"email_on_retry": False,
40-
"retries": 2,
41-
"retry_delay": timedelta(minutes=2),
42-
}
43-
44-
45-
46-
with DAG(
47-
dag_id="COPERNICUS_ARG",
48-
description="ETL of weather data for Brazil",
49-
tags=["Argentina", "Copernicus"],
50-
schedule="@monthly",
51-
default_args=DEFAULT_ARGS,
52-
start_date=pendulum.datetime(2000, 1, 1),
53-
end_date=pendulum.datetime(2024, 1, 1),
54-
catchup=True,
55-
max_active_runs=14,
56-
) as dag:
57-
DATE = "{{ ds }}" # DAG execution date
58-
KEY = Variable.get("cdsapi_key", deserialize_json=True)
59-
URI = Variable.get("psql_main_uri", deserialize_json=True)
60-
61-
@task
62-
def fetch_ds(dt, uri, api_key):
63-
locale = "ARG"
64-
tablename = f"copernicus_{locale.lower()}"
65-
engine = create_engine(uri)
66-
dt = date.fromisoformat(dt)
67-
end_day = calendar.monthrange(dt.year, dt.month)[1]
68-
date_str = f"{dt.replace(day=1)}/{dt.replace(day=end_day)}"
69-
# with engine.connect() as conn:
70-
# cur = conn.execute(
71-
# text(
72-
# f"SELECT geocode FROM weather.{tablename}"
73-
# f" WHERE date = '{dt}'"
74-
# )
75-
# )
76-
# table_geocodes = set(chain(*cur.fetchall()))
77-
#
78-
# all_geocodes = set([adm.code for adm in ADM2.filter(adm0=locale)])
79-
# geocodes = all_geocodes.difference(table_geocodes)
80-
# print("TABLE_GEO ", f"[{len(table_geocodes)}]: ", table_geocodes)
81-
# print("DIFF_GEO: ", f"[{len(geocodes)}]: ", geocodes)
82-
83-
with request.reanalysis_era5_land(
84-
date_str.replace("/", "_") + locale,
85-
api_token=api_key,
86-
date=date_str,
87-
locale=locale,
88-
) as ds:
89-
for adm in ADM2.filter(adm0=locale):
90-
with engine.connect() as conn:
91-
ds.cope.to_sql(adm, conn, tablename, "weather")
92-
93-
fetch_ds(DATE, URI["PSQL_MAIN_URI"], KEY["CDSAPI_KEY"])
18+
# import os
19+
# import logging
20+
# import calendar
21+
# from datetime import timedelta, date
22+
#
23+
# import pendulum
24+
# from airflow import DAG
25+
# from airflow.decorators import task
26+
# from airflow.models import Variable
27+
# from sqlalchemy import create_engine, text
28+
#
29+
# from satellite import request, ADM2
30+
#
31+
# env = os.getenv
32+
# email_main = env("EMAIL_MAIN")
33+
#
34+
# DEFAULT_ARGS = {
35+
# "owner": "AlertaDengue",
36+
# "depends_on_past": False,
37+
# # 'email': [email_main],
38+
# "email_on_failure": True,
39+
# "email_on_retry": False,
40+
# "retries": 2,
41+
# "retry_delay": timedelta(minutes=2),
42+
# }
43+
#
44+
#
45+
#
46+
# with DAG(
47+
# dag_id="COPERNICUS_ARG",
48+
# description="ETL of weather data for Brazil",
49+
# tags=["Argentina", "Copernicus"],
50+
# schedule="@monthly",
51+
# default_args=DEFAULT_ARGS,
52+
# start_date=pendulum.datetime(2000, 1, 1),
53+
# end_date=pendulum.datetime(2024, 1, 1),
54+
# catchup=True,
55+
# max_active_runs=14,
56+
# ) as dag:
57+
# DATE = "{{ ds }}" # DAG execution date
58+
# KEY = Variable.get("cdsapi_key", deserialize_json=True)
59+
# URI = Variable.get("psql_main_uri", deserialize_json=True)
60+
#
61+
# @task
62+
# def fetch_ds(dt, uri, api_key):
63+
# locale = "ARG"
64+
# tablename = f"copernicus_{locale.lower()}"
65+
# engine = create_engine(uri)
66+
# dt = date.fromisoformat(dt)
67+
# end_day = calendar.monthrange(dt.year, dt.month)[1]
68+
# date_str = f"{dt.replace(day=1)}/{dt.replace(day=end_day)}"
69+
# with engine.connect() as conn:
70+
# cur = conn.execute(
71+
# text(
72+
# f"SELECT geocode FROM weather.{tablename}"
73+
# f" WHERE date = '{dt}'"
74+
# )
75+
# )
76+
# table_geocodes = set(chain(*cur.fetchall()))
77+
#
78+
# all_geocodes = set([adm.code for adm in ADM2.filter(adm0=locale)])
79+
# geocodes = all_geocodes.difference(table_geocodes)
80+
# print("TABLE_GEO ", f"[{len(table_geocodes)}]: ", table_geocodes)
81+
# print("DIFF_GEO: ", f"[{len(geocodes)}]: ", geocodes)
82+
#
83+
# with request.reanalysis_era5_land(
84+
# date_str.replace("/", "_") + locale,
85+
# api_token=api_key,
86+
# date=date_str,
87+
# locale=locale,
88+
# ) as ds:
89+
# for adm in ADM2.filter(adm0=locale):
90+
# with engine.connect() as conn:
91+
# ds.cope.to_sql(adm, conn, tablename, "weather")
92+
#
93+
# fetch_ds(DATE, URI["PSQL_MAIN_URI"], KEY["CDSAPI_KEY"])

alertflow/dags/satellite-weather/brasil.py

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,16 @@
1616
"""
1717

1818
import os
19-
import logging
20-
import calendar
21-
from datetime import timedelta, date
19+
from datetime import date, timedelta
20+
from itertools import chain
2221

2322
import pendulum
2423
from airflow import DAG
2524
from airflow.decorators import task
2625
from airflow.models import Variable
26+
from satellite import ADM2, request
2727
from sqlalchemy import create_engine, text
2828

29-
from satellite import request, ADM2
30-
3129
env = os.getenv
3230
email_main = env("EMAIL_MAIN")
3331

@@ -42,7 +40,6 @@
4240
}
4341

4442

45-
4643
with DAG(
4744
dag_id="COPERNICUS_BRASIL",
4845
description="ETL of weather data for Brazil",
@@ -59,35 +56,33 @@
5956
URI = Variable.get("psql_main_uri", deserialize_json=True)
6057

6158
@task
62-
def fetch_ds(dt, uri, api_key):
63-
locale = "BRA"
59+
def fetch_ds(locale, dt, uri, api_key):
6460
tablename = f"copernicus_{locale.lower()}"
6561
engine = create_engine(uri)
66-
dt = date.fromisoformat(dt) # - timedelta(days=5)
67-
end_day = calendar.monthrange(dt.year, dt.month)[1]
68-
date_str = f"{dt.replace(day=1)}/{dt.replace(day=end_day)}"
69-
# with engine.connect() as conn:
70-
# cur = conn.execute(
71-
# text(
72-
# f"SELECT geocode FROM weather.{tablename}"
73-
# f" WHERE date = '{dt}'"
74-
# )
75-
# )
76-
# table_geocodes = set(chain(*cur.fetchall()))
77-
#
78-
# all_geocodes = set([adm.code for adm in ADM2.filter(adm0=locale)])
79-
# geocodes = all_geocodes.difference(table_geocodes)
80-
# print("TABLE_GEO ", f"[{len(table_geocodes)}]: ", table_geocodes)
81-
# print("DIFF_GEO: ", f"[{len(geocodes)}]: ", geocodes)
62+
dt = date.fromisoformat(dt) - timedelta(days=5)
63+
64+
with engine.connect() as conn:
65+
cur = conn.execute(
66+
text(
67+
f"SELECT geocode FROM weather.{tablename}"
68+
f" WHERE date = '{str(dt)}'"
69+
)
70+
)
71+
table_geocodes = set(chain(*cur.fetchall()))
72+
73+
all_geocodes = set([adm.code for adm in ADM2.filter(adm0=locale)])
74+
geocodes = all_geocodes.difference(table_geocodes)
75+
print("TABLE_GEO ", f"[{len(table_geocodes)}]: ", table_geocodes)
76+
print("DIFF_GEO: ", f"[{len(geocodes)}]: ", geocodes)
8277

8378
with request.reanalysis_era5_land(
84-
date_str.replace("/", "_") + locale,
79+
str(dt).replace("-", "_") + locale,
8580
api_token=api_key,
86-
date=date_str,
81+
date=str(dt),
8782
locale=locale,
8883
) as ds:
8984
for adm in ADM2.filter(adm0=locale):
9085
with engine.connect() as conn:
9186
ds.cope.to_sql(adm, conn, tablename, "weather")
9287

93-
fetch_ds(DATE, URI["PSQL_MAIN_URI"], KEY["CDSAPI_KEY"])
88+
fetch_ds("BRA", DATE, URI["PSQL_MAIN_URI"], KEY["CDSAPI_KEY"])

conda/env.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ channels:
33
- conda-forge
44
- nodefaults
55
dependencies:
6-
- python >3.10,<4
6+
- python >3.10,<3.13
77
- pip
88
- pre-commit
99
- poetry >= 1.7.1

0 commit comments

Comments
 (0)