Skip to content

Commit 1fc7868

Browse files
committed
update dag & include dag for ARG
1 parent bbb8262 commit 1fc7868

File tree

5 files changed

+3578
-445
lines changed

5 files changed

+3578
-445
lines changed

alertflow/airflow.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ logging_level = INFO
331331
# Logging level for celery. If not set, it uses the value of logging_level
332332
#
333333
# Supported values: ``CRITICAL``, ``ERROR``, ``WARNING``, ``INFO``, ``DEBUG``.
334-
celery_logging_level = DEBUG
334+
celery_logging_level = INFO
335335

336336
# Logging level for Flask-appbuilder UI.
337337
#
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
"""
2+
Author: Luã Bida Vacaro
3+
Email: luabidaa@gmail.com
4+
Github: https://github.com/luabida
5+
Date: 2023-04-13
6+
7+
The COPERNICUS_ARG Airflow DAG will collect daily weather
8+
data from the Copernicus ERA5 Land Reanalysis dataset for all
9+
cities in Argentina. This data includes temperature, precipitation,
10+
humidity, and atmospheric pressure, which is collected daily
11+
starting from January 1st, 2000 to the present day.
12+
13+
To ensure reliability and safety, the DAG has a 9-day delay
14+
from the current date, as the Copernicus API usually takes
15+
around 7 days to update the dataset.
16+
"""
17+
18+
import os
19+
import logging
20+
import calendar
21+
from datetime import timedelta, date
22+
23+
import pendulum
24+
from airflow import DAG
25+
from airflow.decorators import task
26+
from airflow.models import Variable
27+
from sqlalchemy import create_engine, text
28+
29+
from satellite import request, ADM2
30+
31+
env = os.getenv
32+
email_main = env("EMAIL_MAIN")
33+
34+
DEFAULT_ARGS = {
35+
"owner": "AlertaDengue",
36+
"depends_on_past": False,
37+
# 'email': [email_main],
38+
"email_on_failure": True,
39+
"email_on_retry": False,
40+
"retries": 2,
41+
"retry_delay": timedelta(minutes=2),
42+
}
43+
44+
45+
46+
with DAG(
47+
dag_id="COPERNICUS_ARG",
48+
description="ETL of weather data for Brazil",
49+
tags=["Argentina", "Copernicus"],
50+
schedule="@monthly",
51+
default_args=DEFAULT_ARGS,
52+
start_date=pendulum.datetime(2000, 1, 1),
53+
end_date=pendulum.datetime(2024, 1, 1),
54+
catchup=True,
55+
max_active_runs=14,
56+
) as dag:
57+
DATE = "{{ ds }}" # DAG execution date
58+
KEY = Variable.get("cdsapi_key", deserialize_json=True)
59+
URI = Variable.get("psql_main_uri", deserialize_json=True)
60+
61+
@task
62+
def fetch_ds(dt, uri, api_key):
63+
locale = "ARG"
64+
tablename = f"copernicus_{locale.lower()}"
65+
engine = create_engine(uri)
66+
dt = date.fromisoformat(dt)
67+
end_day = calendar.monthrange(dt.year, dt.month)[1]
68+
date_str = f"{dt.replace(day=1)}/{dt.replace(day=end_day)}"
69+
# with engine.connect() as conn:
70+
# cur = conn.execute(
71+
# text(
72+
# f"SELECT geocode FROM weather.{tablename}"
73+
# f" WHERE date = '{dt}'"
74+
# )
75+
# )
76+
# table_geocodes = set(chain(*cur.fetchall()))
77+
#
78+
# all_geocodes = set([adm.code for adm in ADM2.filter(adm0=locale)])
79+
# geocodes = all_geocodes.difference(table_geocodes)
80+
# print("TABLE_GEO ", f"[{len(table_geocodes)}]: ", table_geocodes)
81+
# print("DIFF_GEO: ", f"[{len(geocodes)}]: ", geocodes)
82+
83+
with request.reanalysis_era5_land(
84+
date_str.replace("/", "_") + locale,
85+
api_token=api_key,
86+
date=date_str,
87+
locale=locale,
88+
) as ds:
89+
for adm in ADM2.filter(adm0=locale):
90+
with engine.connect() as conn:
91+
ds.cope.to_sql(adm, conn, tablename, "weather")
92+
93+
fetch_ds(DATE, URI["PSQL_MAIN_URI"], KEY["CDSAPI_KEY"])

alertflow/dags/satellite-weather/brasil.py

Lines changed: 41 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,15 @@
1616
"""
1717

1818
import os
19-
from datetime import timedelta
19+
import logging
20+
import calendar
21+
from datetime import timedelta, date
2022

2123
import pendulum
2224
from airflow import DAG
2325
from airflow.decorators import task
26+
from airflow.models import Variable
27+
from sqlalchemy import create_engine, text
2428

2529
from satellite import request, ADM2
2630

@@ -38,106 +42,52 @@
3842
}
3943

4044

45+
4146
with DAG(
4247
dag_id="COPERNICUS_BRASIL",
4348
description="ETL of weather data for Brazil",
4449
tags=["Brasil", "Copernicus"],
4550
schedule="@monthly",
4651
default_args=DEFAULT_ARGS,
47-
start_date=pendulum.datetime(2024, 1, 1),
52+
start_date=pendulum.datetime(2000, 1, 1),
53+
end_date=pendulum.datetime(2024, 1, 1),
4854
catchup=True,
4955
max_active_runs=14,
50-
):
51-
from airflow.models import Variable
52-
56+
) as dag:
5357
DATE = "{{ ds }}" # DAG execution date
54-
DATA_DIR = "/tmp/copernicus"
5558
KEY = Variable.get("cdsapi_key", deserialize_json=True)
5659
URI = Variable.get("psql_main_uri", deserialize_json=True)
5760

58-
print(DATE)
59-
print(KEY)
60-
print(URI)
61+
@task
62+
def fetch_ds(dt, uri, api_key):
63+
locale = "BRA"
64+
tablename = f"copernicus_{locale.lower()}"
65+
engine = create_engine(uri)
66+
dt = date.fromisoformat(dt) # - timedelta(days=5)
67+
end_day = calendar.monthrange(dt.year, dt.month)[1]
68+
date_str = f"{dt.replace(day=1)}/{dt.replace(day=end_day)}"
69+
# with engine.connect() as conn:
70+
# cur = conn.execute(
71+
# text(
72+
# f"SELECT geocode FROM weather.{tablename}"
73+
# f" WHERE date = '{dt}'"
74+
# )
75+
# )
76+
# table_geocodes = set(chain(*cur.fetchall()))
77+
#
78+
# all_geocodes = set([adm.code for adm in ADM2.filter(adm0=locale)])
79+
# geocodes = all_geocodes.difference(table_geocodes)
80+
# print("TABLE_GEO ", f"[{len(table_geocodes)}]: ", table_geocodes)
81+
# print("DIFF_GEO: ", f"[{len(geocodes)}]: ", geocodes)
82+
83+
with request.reanalysis_era5_land(
84+
date_str.replace("/", "_") + locale,
85+
api_token=api_key,
86+
date=date_str,
87+
locale=locale,
88+
) as ds:
89+
for adm in ADM2.filter(adm0=locale):
90+
with engine.connect() as conn:
91+
ds.cope.to_sql(adm, conn, tablename, "weather")
6192

62-
# # fmt: off
63-
# @task.external_python(
64-
# task_id="daily_fetch",
65-
# python="/opt/py310/bin/python3.10"
66-
# )
67-
# # fmt: on
68-
# def extract_transform_load(
69-
# date: str, data_dir: str, api_key: str, psql_uri: str
70-
# ) -> str:
71-
# """
72-
# Due to incompatibility issues between Airflow's Python version
73-
# and the satellite-weather-downloader (SWD) package, this task
74-
# will be executed in a dedicated virtual environment, which
75-
# includes a pre-installed Python3.10 interpreter within the
76-
# container. All imports must be within the scope of the task,
77-
# and XCom sharing between tasks is not allowed.
78-
#
79-
# The task is designed to receive the execution date and download
80-
# the weather dataset for that specific day. After downloading,
81-
# the data is transformed using Xarray and inserted into the Main
82-
# Postgres DB, as specified in the .env file, in the form of a
83-
# DataFrame containing the weather information.
84-
# """
85-
# from datetime import timedelta
86-
# from itertools import chain
87-
# from pathlib import Path
88-
#
89-
# from dateutil import parser
90-
# from satellite import downloader as sat_d
91-
# from satellite import weather as sat_w
92-
# from satellite.weather.brazil.extract_latlons import MUNICIPALITIES
93-
# from sqlalchemy import create_engine, text
94-
#
95-
# start_date = parser.parse(str(date))
96-
# max_update_delay = start_date - timedelta(days=6)
97-
#
98-
# with create_engine(psql_uri["PSQL_MAIN_URI"]).connect() as conn:
99-
# cur = conn.execute(
100-
# text(
101-
# "SELECT geocodigo FROM weather.copernicus_brasil"
102-
# f" WHERE date = '{str(max_update_delay.date())}'"
103-
# )
104-
# )
105-
# table_geocodes = set(chain(*cur.fetchall()))
106-
#
107-
# all_geocodes = set([mun["geocodigo"] for mun in MUNICIPALITIES])
108-
# geocodes = all_geocodes.difference(table_geocodes)
109-
# print("TABLE_GEO ", f"[{len(table_geocodes)}]: ", table_geocodes)
110-
# print("DIFF_GEO: ", f"[{len(geocodes)}]: ", geocodes)
111-
#
112-
# if not geocodes:
113-
# return "There is no geocode to fetch"
114-
#
115-
# # Downloads daily dataset
116-
# netcdf_file = sat_d.download_br_netcdf(
117-
# date=str(max_update_delay.date()),
118-
# data_dir=data_dir,
119-
# user_key=api_key["CDSAPI_KEY"],
120-
# )
121-
#
122-
# print(f"Handling {netcdf_file}")
123-
#
124-
# # Reads the NetCDF4 file using Xarray
125-
# ds = sat_w.load_dataset(netcdf_file)
126-
#
127-
# with create_engine(psql_uri["PSQL_MAIN_URI"]).connect() as conn:
128-
# ds.copebr.to_sql(
129-
# tablename="copernicus_brasil",
130-
# schema="weather",
131-
# geocodes=list(geocodes),
132-
# con=conn,
133-
# )
134-
#
135-
# # Deletes the NetCDF4 file
136-
# Path(netcdf_file).unlink(missing_ok=True)
137-
#
138-
# return f"{len(geocodes)} inserted into DB."
139-
#
140-
# # Instantiate the Task
141-
# ETL = extract_transform_load(DATE, DATA_DIR, KEY, URI)
142-
#
143-
# ETL # Execute
93+
fetch_ds(DATE, URI["PSQL_MAIN_URI"], KEY["CDSAPI_KEY"])

0 commit comments

Comments
 (0)