Skip to content

Commit 64d3ad9

Browse files
authored
chore(copernicus): implement the new Copernicus api ETL (#34)
* chore(copernicus): implement the new Copernicus api ETL * remove conda from container; update airflow to latest; clean unused tasks * update dag & include dag for ARG * include daily fetch with backfill for each adm 2
1 parent 323c35c commit 64d3ad9

File tree

10 files changed

+5862
-777
lines changed

10 files changed

+5862
-777
lines changed

alertflow/airflow.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ logging_level = INFO
331331
# Logging level for celery. If not set, it uses the value of logging_level
332332
#
333333
# Supported values: ``CRITICAL``, ``ERROR``, ``WARNING``, ``INFO``, ``DEBUG``.
334-
celery_logging_level = DEBUG
334+
celery_logging_level = INFO
335335

336336
# Logging level for Flask-appbuilder UI.
337337
#

alertflow/dags/episcanner/episcanner_export_data.py

Lines changed: 0 additions & 103 deletions
This file was deleted.
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
"""
2+
Author: Luã Bida Vacaro
3+
4+
Github: https://github.com/luabida
5+
Date: 2023-04-13
6+
7+
The COPERNICUS_ARG Airflow DAG will collect daily weather
8+
data from the Copernicus ERA5 Land Reanalysis dataset for all
9+
cities in Argentina. This data includes temperature, precipitation,
10+
humidity, and atmospheric pressure, which is collected daily
11+
starting from January 1st, 2000 to the present day.
12+
13+
To ensure reliability and safety, the DAG has a 9-day delay
14+
from the current date, as the Copernicus API usually takes
15+
around 7 days to update the dataset.
16+
"""
17+
18+
# import os
19+
# import logging
20+
# import calendar
21+
# from datetime import timedelta, date
22+
#
23+
# import pendulum
24+
# from airflow import DAG
25+
# from airflow.decorators import task
26+
# from airflow.models import Variable
27+
# from sqlalchemy import create_engine, text
28+
#
29+
# from satellite import request, ADM2
30+
#
31+
# env = os.getenv
32+
# email_main = env("EMAIL_MAIN")
33+
#
34+
# DEFAULT_ARGS = {
35+
# "owner": "AlertaDengue",
36+
# "depends_on_past": False,
37+
# # 'email': [email_main],
38+
# "email_on_failure": True,
39+
# "email_on_retry": False,
40+
# "retries": 2,
41+
# "retry_delay": timedelta(minutes=2),
42+
# }
43+
#
44+
#
45+
#
46+
# with DAG(
47+
# dag_id="COPERNICUS_ARG",
48+
# description="ETL of weather data for Brazil",
49+
# tags=["Argentina", "Copernicus"],
50+
# schedule="@monthly",
51+
# default_args=DEFAULT_ARGS,
52+
# start_date=pendulum.datetime(2000, 1, 1),
53+
# end_date=pendulum.datetime(2024, 1, 1),
54+
# catchup=True,
55+
# max_active_runs=14,
56+
# ) as dag:
57+
# DATE = "{{ ds }}" # DAG execution date
58+
# KEY = Variable.get("cdsapi_key", deserialize_json=True)
59+
# URI = Variable.get("psql_main_uri", deserialize_json=True)
60+
#
61+
# @task
62+
# def fetch_ds(dt, uri, api_key):
63+
# locale = "ARG"
64+
# tablename = f"copernicus_{locale.lower()}"
65+
# engine = create_engine(uri)
66+
# dt = date.fromisoformat(dt)
67+
# end_day = calendar.monthrange(dt.year, dt.month)[1]
68+
# date_str = f"{dt.replace(day=1)}/{dt.replace(day=end_day)}"
69+
# with engine.connect() as conn:
70+
# cur = conn.execute(
71+
# text(
72+
# f"SELECT geocode FROM weather.{tablename}"
73+
# f" WHERE date = '{dt}'"
74+
# )
75+
# )
76+
# table_geocodes = set(chain(*cur.fetchall()))
77+
#
78+
# all_geocodes = set([adm.code for adm in ADM2.filter(adm0=locale)])
79+
# geocodes = all_geocodes.difference(table_geocodes)
80+
# print("TABLE_GEO ", f"[{len(table_geocodes)}]: ", table_geocodes)
81+
# print("DIFF_GEO: ", f"[{len(geocodes)}]: ", geocodes)
82+
#
83+
# with request.reanalysis_era5_land(
84+
# date_str.replace("/", "_") + locale,
85+
# api_token=api_key,
86+
# date=date_str,
87+
# locale=locale,
88+
# ) as ds:
89+
# for adm in ADM2.filter(adm0=locale):
90+
# with engine.connect() as conn:
91+
# ds.cope.to_sql(adm, conn, tablename, "weather")
92+
#
93+
# fetch_ds(DATE, URI["PSQL_MAIN_URI"], KEY["CDSAPI_KEY"])

alertflow/dags/satellite-weather/brasil.py

Lines changed: 29 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,15 @@
1616
"""
1717

1818
import os
19-
from datetime import timedelta
19+
from datetime import date, timedelta
20+
from itertools import chain
2021

2122
import pendulum
2223
from airflow import DAG
2324
from airflow.decorators import task
25+
from airflow.models import Variable
26+
from satellite import ADM2, request
27+
from sqlalchemy import create_engine, text
2428

2529
env = os.getenv
2630
email_main = env("EMAIL_MAIN")
@@ -40,98 +44,45 @@
4044
dag_id="COPERNICUS_BRASIL",
4145
description="ETL of weather data for Brazil",
4246
tags=["Brasil", "Copernicus"],
43-
schedule="@daily",
47+
schedule="@monthly",
4448
default_args=DEFAULT_ARGS,
45-
start_date=pendulum.datetime(2024, 1, 1),
49+
start_date=pendulum.datetime(2000, 1, 1),
50+
end_date=pendulum.datetime(2024, 1, 1),
4651
catchup=True,
4752
max_active_runs=14,
48-
):
49-
from airflow.models import Variable
50-
53+
) as dag:
5154
DATE = "{{ ds }}" # DAG execution date
52-
DATA_DIR = "/tmp/copernicus"
5355
KEY = Variable.get("cdsapi_key", deserialize_json=True)
5456
URI = Variable.get("psql_main_uri", deserialize_json=True)
5557

56-
# fmt: off
57-
@task.external_python(
58-
task_id="daily_fetch",
59-
python="/opt/py310/bin/python3.10"
60-
)
61-
# fmt: on
62-
def extract_transform_load(
63-
date: str, data_dir: str, api_key: str, psql_uri: str
64-
) -> str:
65-
"""
66-
Due to incompatibility issues between Airflow's Python version
67-
and the satellite-weather-downloader (SWD) package, this task
68-
will be executed in a dedicated virtual environment, which
69-
includes a pre-installed Python3.10 interpreter within the
70-
container. All imports must be within the scope of the task,
71-
and XCom sharing between tasks is not allowed.
72-
73-
The task is designed to receive the execution date and download
74-
the weather dataset for that specific day. After downloading,
75-
the data is transformed using Xarray and inserted into the Main
76-
Postgres DB, as specified in the .env file, in the form of a
77-
DataFrame containing the weather information.
78-
"""
79-
from datetime import timedelta
80-
from itertools import chain
81-
from pathlib import Path
82-
83-
from dateutil import parser
84-
from satellite import downloader as sat_d
85-
from satellite import weather as sat_w
86-
from satellite.weather.brazil.extract_latlons import MUNICIPALITIES
87-
from sqlalchemy import create_engine, text
88-
89-
start_date = parser.parse(str(date))
90-
max_update_delay = start_date - timedelta(days=6)
58+
@task
59+
def fetch_ds(locale, dt, uri, api_key):
60+
tablename = f"copernicus_{locale.lower()}"
61+
engine = create_engine(uri)
62+
dt = date.fromisoformat(dt) - timedelta(days=5)
9163

92-
with create_engine(psql_uri["PSQL_MAIN_URI"]).connect() as conn:
64+
with engine.connect() as conn:
9365
cur = conn.execute(
9466
text(
95-
"SELECT geocodigo FROM weather.copernicus_brasil"
96-
f" WHERE date = '{str(max_update_delay.date())}'"
67+
f"SELECT geocode FROM weather.{tablename}"
68+
f" WHERE date = '{str(dt)}'"
9769
)
9870
)
9971
table_geocodes = set(chain(*cur.fetchall()))
10072

101-
all_geocodes = set([mun["geocodigo"] for mun in MUNICIPALITIES])
73+
all_geocodes = set([adm.code for adm in ADM2.filter(adm0=locale)])
10274
geocodes = all_geocodes.difference(table_geocodes)
10375
print("TABLE_GEO ", f"[{len(table_geocodes)}]: ", table_geocodes)
10476
print("DIFF_GEO: ", f"[{len(geocodes)}]: ", geocodes)
10577

106-
if not geocodes:
107-
return "There is no geocode to fetch"
108-
109-
# Downloads daily dataset
110-
netcdf_file = sat_d.download_br_netcdf(
111-
date=str(max_update_delay.date()),
112-
data_dir=data_dir,
113-
user_key=api_key["CDSAPI_KEY"],
114-
)
115-
116-
print(f"Handling {netcdf_file}")
117-
118-
# Reads the NetCDF4 file using Xarray
119-
ds = sat_w.load_dataset(netcdf_file)
120-
121-
with create_engine(psql_uri["PSQL_MAIN_URI"]).connect() as conn:
122-
ds.copebr.to_sql(
123-
tablename="copernicus_brasil",
124-
schema="weather",
125-
geocodes=list(geocodes),
126-
con=conn,
127-
)
128-
129-
# Deletes the NetCDF4 file
130-
Path(netcdf_file).unlink(missing_ok=True)
131-
132-
return f"{len(geocodes)} inserted into DB."
133-
134-
# Instantiate the Task
135-
ETL = extract_transform_load(DATE, DATA_DIR, KEY, URI)
136-
137-
ETL # Execute
78+
with request.reanalysis_era5_land(
79+
str(dt).replace("-", "_") + locale,
80+
api_token=api_key,
81+
date=str(dt),
82+
locale=locale,
83+
) as ds:
84+
for adm in ADM2.filter(adm0=locale):
85+
with engine.connect() as conn:
86+
ds.cope.to_sql(adm, conn, tablename, "weather")
87+
88+
fetch_ds("BRA", DATE, URI["PSQL_MAIN_URI"], KEY["CDSAPI_KEY"])

0 commit comments

Comments
 (0)