3939 dag_id = 'COPERNICUS_BRASIL' ,
4040 description = 'ETL of weather data for Brazil' ,
4141 tags = ['Brasil' , 'Copernicus' ],
42- schedule = '@monthly ' ,
42+ schedule = '@daily ' ,
4343 default_args = DEFAULT_ARGS ,
44- start_date = pendulum .datetime (2015 , 1 , 16 ),
44+ start_date = pendulum .datetime (2014 , 1 , 1 ),
4545 catchup = True ,
46- max_active_runs = 4 ,
46+ max_active_runs = 14 ,
4747):
4848 from airflow .models import Variable
4949
5757 )
5858 def extract_transform_load (
5959 date : str , data_dir : str , api_key : str , psql_uri : str
60- ) -> None :
60+ ) -> str :
6161 """
6262 Due to incompatibility issues between Airflow's Python version
6363 and the satellite-weather-downloader (SWD) package, this task
@@ -72,8 +72,8 @@ def extract_transform_load(
7272 Postgres DB, as specified in the .env file, in the form of a
7373 DataFrame containing the weather information.
7474 """
75- import calendar
76- from datetime import datetime
75+ from datetime import timedelta
76+ from itertools import chain
7777 from pathlib import Path
7878
7979 from dateutil import parser
@@ -83,25 +83,26 @@ def extract_transform_load(
8383 from sqlalchemy import create_engine
8484
8585 start_date = parser .parse (str (date ))
86- # max_update_delay = start_date - timedelta(days=8)
86+ max_update_delay = start_date - timedelta (days = 8 )
8787
88- if start_date . month == 1 :
89- ini_date = datetime ( start_date . year - 1 , 12 , 1 ). date ()
90- else :
91- ini_date = datetime (
92- start_date . year , start_date . month - 1 , 1
93- ). date ( )
88+ with create_engine ( psql_uri [ 'PSQL_MAIN_URI' ]). connect () as conn :
89+ cur = conn . execute (
90+ 'SELECT geocodigo FROM weather.copernicus_brasil'
91+ f" WHERE date = ' { str ( max_update_delay . date ()) } '"
92+ )
93+ table_geocodes = set ( chain ( * cur . fetchall ()) )
9494
95- end_date = datetime (
96- ini_date .year ,
97- ini_date .month ,
98- calendar .monthrange (ini_date .year , ini_date .month )[1 ],
99- ).date ()
95+ all_geocodes = set ([mun ['geocodigo' ] for mun in MUNICIPIOS ])
96+ geocodes = all_geocodes .difference (table_geocodes )
97+ print ('TABLE_GEO ' , f'[{ len (table_geocodes )} ]: ' , table_geocodes )
98+ print ('DIFF_GEO: ' , f'[{ len (geocodes )} ]: ' , geocodes )
99+
100+ if not geocodes :
101+ return 'There is no geocode to fetch'
100102
101103 # Downloads daily dataset
102104 netcdf_file = sat_d .download_br_netcdf (
103- date = str (ini_date ),
104- date_end = str (end_date ),
105+ date = str (max_update_delay .date ()),
105106 data_dir = data_dir ,
106107 user_key = api_key ['CDSAPI_KEY' ],
107108 )
@@ -110,51 +111,20 @@ def extract_transform_load(
110111
111112 # Reads the NetCDF4 file using Xarray
112113 ds = sat_w .load_dataset (netcdf_file )
113- geocodes = [mun ['geocodigo' ] for mun in MUNICIPIOS ]
114-
115- def geocode_in_db (geocode : int ) -> bool :
116- # Checks if date has been already inserted into DB
117- try :
118- with create_engine (
119- psql_uri ['PSQL_MAIN_URI' ]
120- ).connect () as conn :
121- cur = conn .execute (
122- 'SELECT EXISTS ('
123- ' SELECT FROM weather.copernicus_brasil'
124- f" WHERE date = '{ ini_date } ' AND geocodigo = { geocode } "
125- ')'
126- )
127- return cur .fetchone ()[0 ]
128- except Exception as e :
129- if 'UndefinedTable' in str (e ):
130- # For dev purposes, in case table was not found
131- print ('First insertion' )
132- return False
133- else :
134- raise e
135-
136- for geocode in geocodes :
137- print (f'Handling { geocode } ' , flush = True )
138-
139- if geocode_in_db (geocode ):
140- print (f'{ geocode } already in DB' )
141- continue
142-
143- try :
144- ds .copebr .to_sql (
145- tablename = 'copernicus_brasil' ,
146- schema = 'weather' ,
147- geocodes = geocode ,
148- sql_uri = psql_uri ['PSQL_MAIN_URI' ],
149- )
150- print (f'{ geocode } inserted into DB' )
151- except Exception as e :
152- print (type (e ))
153- continue
114+
115+ with create_engine (psql_uri ['PSQL_MAIN_URI' ]).connect () as conn :
116+ ds .copebr .to_sql (
117+ tablename = 'copernicus_brasil' ,
118+ schema = 'weather' ,
119+ geocodes = list (geocodes ),
120+ con = conn ,
121+ )
154122
155123 # Deletes the NetCDF4 file
156124 Path (netcdf_file ).unlink (missing_ok = True )
157125
126+ return f'{ len (geocodes )} inserted into DB.'
127+
158128 # Instantiate the Task
159129 ETL = extract_transform_load (DATE , DATA_DIR , KEY , URI )
160130
0 commit comments